Skip to content

Commit 879b240

Browse files
author
Isaac Hier
committed
Add rate limiter option to Cassandra writer
Signed-off-by: Isaac Hier <ihier@uber.com>
1 parent 4a07b78 commit 879b240

File tree

5 files changed

+258
-1
lines changed

5 files changed

+258
-1
lines changed

internal/utils/rate_limiter.go

+98
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright (c) 2018 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package utils
16+
17+
import (
18+
"errors"
19+
"sync"
20+
"time"
21+
)
22+
23+
var (
24+
ErrInvalidCreditsPerSecond = errors.New("invalid credits per second, must be greater than zero")
25+
ErrInvalidMaxBalance = errors.New("invalid max balance, must be greater than zero")
26+
)
27+
28+
// RateLimiter is a filter used to check if a message that is worth itemCost units is within the rate limits.
29+
type RateLimiter interface {
30+
CheckCredit(itemCost float64) (bool, time.Duration)
31+
}
32+
33+
type rateLimiter struct {
34+
sync.Mutex
35+
36+
creditsPerSecond float64
37+
balance float64
38+
maxBalance float64
39+
lastTick time.Time
40+
41+
timeNow func() time.Time
42+
}
43+
44+
// NewRateLimiter creates a new rate limiter based on leaky bucket algorithm, formulated in terms of a
45+
// credits balance that is replenished every time CheckCredit() method is called (tick) by the amount proportional
46+
// to the time elapsed since the last tick, up to max of creditsPerSecond. A call to CheckCredit() takes a cost
47+
// of an item we want to pay with the balance. If the balance exceeds the cost of the item, the item is "purchased"
48+
// and the balance reduced, indicated by returned value of true with a wait time of zero. Otherwise the balance is
49+
// unchanged and return false with the time until the next credit accrues.
50+
//
51+
// This can be used to limit a rate of messages emitted by a service by instantiating the Rate Limiter with the
52+
// max number of messages a service is allowed to emit per second, and calling CheckCredit(1.0) for each message
53+
// to determine if the message is within the rate limit.
54+
//
55+
// It can also be used to limit the rate of traffic in bytes, by setting creditsPerSecond to desired throughput
56+
// as bytes/second, and calling CheckCredit() with the actual message size.
57+
func NewRateLimiter(creditsPerSecond, maxBalance float64) (RateLimiter, error) {
58+
if creditsPerSecond < 0 {
59+
return nil, ErrInvalidCreditsPerSecond
60+
}
61+
if maxBalance < 0 {
62+
return nil, ErrInvalidMaxBalance
63+
}
64+
return &rateLimiter{
65+
creditsPerSecond: creditsPerSecond,
66+
balance: maxBalance,
67+
maxBalance: maxBalance,
68+
lastTick: time.Now(),
69+
timeNow: time.Now,
70+
}, nil
71+
}
72+
73+
func (b *rateLimiter) CheckCredit(itemCost float64) (bool, time.Duration) {
74+
b.Lock()
75+
defer b.Unlock()
76+
b.updateBalance()
77+
// if we have enough credits to pay for current item, then reduce balance and allow
78+
if b.balance >= itemCost {
79+
b.balance -= itemCost
80+
return true, 0
81+
}
82+
creditsRemaining := itemCost - b.balance
83+
waitTime := time.Nanosecond * time.Duration(creditsRemaining*float64(time.Second.Nanoseconds())/b.creditsPerSecond)
84+
return false, waitTime
85+
}
86+
87+
// N.B. Must be called while holding the lock.
88+
func (b *rateLimiter) updateBalance() {
89+
// calculate how much time passed since the last tick, and update current tick
90+
currentTime := b.timeNow()
91+
elapsedTime := currentTime.Sub(b.lastTick)
92+
b.lastTick = currentTime
93+
// calculate how much credit have we accumulated since the last tick
94+
b.balance += elapsedTime.Seconds() * b.creditsPerSecond
95+
if b.balance > b.maxBalance {
96+
b.balance = b.maxBalance
97+
}
98+
}

internal/utils/rate_limiter_test.go

+98
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright (c) 2018 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package utils
16+
17+
import (
18+
"testing"
19+
"time"
20+
21+
"github.com/stretchr/testify/assert"
22+
)
23+
24+
func TestRateLimiter(t *testing.T) {
25+
limiter := NewRateLimiter(2.0, 2.0)
26+
// stop time
27+
ts := time.Now()
28+
limiter.(*rateLimiter).lastTick = ts
29+
limiter.(*rateLimiter).timeNow = func() time.Time {
30+
return ts
31+
}
32+
ok, waitTime := limiter.CheckCredit(1.0)
33+
assert.True(t, ok)
34+
assert.Equal(t, time.Duration(0), waitTime)
35+
ok, waitTime = limiter.CheckCredit(1.0)
36+
assert.True(t, ok)
37+
assert.Equal(t, time.Duration(0), waitTime)
38+
ok, waitTime = limiter.CheckCredit(1.0)
39+
assert.False(t, ok)
40+
assert.Equal(t, time.Second/2, waitTime)
41+
// move time 250ms forward, not enough credits to pay for 1.0 item
42+
limiter.(*rateLimiter).timeNow = func() time.Time {
43+
return ts.Add(time.Second / 4)
44+
}
45+
ok, waitTime = limiter.CheckCredit(1.0)
46+
assert.False(t, ok)
47+
assert.Equal(t, time.Second/4, waitTime)
48+
// move time 500ms forward, now enough credits to pay for 1.0 item
49+
limiter.(*rateLimiter).timeNow = func() time.Time {
50+
return ts.Add(time.Second/4 + time.Second/2)
51+
}
52+
ok, waitTime = limiter.CheckCredit(1.0)
53+
assert.True(t, ok)
54+
assert.Equal(t, time.Duration(0), waitTime)
55+
ok, waitTime = limiter.CheckCredit(1.0)
56+
assert.False(t, ok)
57+
assert.Equal(t, time.Second/4, waitTime)
58+
// move time 5s forward, enough to accumulate credits for 10 messages, but it should still be capped at 2
59+
limiter.(*rateLimiter).lastTick = ts
60+
limiter.(*rateLimiter).timeNow = func() time.Time {
61+
return ts.Add(5 * time.Second)
62+
}
63+
for i := 0; i < 2; i++ {
64+
ok, waitTime = limiter.CheckCredit(1.0)
65+
assert.True(t, ok)
66+
assert.Equal(t, time.Duration(0), waitTime)
67+
}
68+
for i := 0; i < 3; i++ {
69+
ok, waitTime = limiter.CheckCredit(1.0)
70+
assert.False(t, ok)
71+
assert.Equal(t, time.Second/2, waitTime)
72+
}
73+
}
74+
75+
func TestMaxBalance(t *testing.T) {
76+
limiter := NewRateLimiter(0.1, 1.0)
77+
// stop time
78+
ts := time.Now()
79+
limiter.(*rateLimiter).lastTick = ts
80+
limiter.(*rateLimiter).timeNow = func() time.Time {
81+
return ts
82+
}
83+
// on initialization, should have enough credits for 1 message
84+
ok, waitTime := limiter.CheckCredit(1.0)
85+
assert.True(t, ok)
86+
assert.Equal(t, time.Duration(0), waitTime)
87+
88+
// move time 20s forward, enough to accumulate credits for 2 messages, but it should still be capped at 1
89+
limiter.(*rateLimiter).timeNow = func() time.Time {
90+
return ts.Add(time.Second * 20)
91+
}
92+
ok, waitTime = limiter.CheckCredit(1.0)
93+
assert.True(t, ok)
94+
assert.Equal(t, time.Duration(0), waitTime)
95+
ok, waitTime = limiter.CheckCredit(1.0)
96+
assert.False(t, ok)
97+
assert.Equal(t, 10*time.Second, waitTime)
98+
}

plugin/storage/cassandra/factory.go

+40-1
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,15 @@ package cassandra
1616

1717
import (
1818
"flag"
19+
"io"
20+
"time"
1921

2022
"github.com/spf13/viper"
2123
"github.com/uber/jaeger-lib/metrics"
2224
"go.uber.org/zap"
2325

26+
"github.com/jaegertracing/jaeger/internal/utils"
27+
"github.com/jaegertracing/jaeger/model"
2428
"github.com/jaegertracing/jaeger/pkg/cassandra"
2529
"github.com/jaegertracing/jaeger/pkg/cassandra/config"
2630
cDepStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/dependencystore"
@@ -101,7 +105,42 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
101105

102106
// CreateSpanWriter implements storage.Factory
103107
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
104-
return cSpanStore.NewSpanWriter(f.primarySession, f.Options.SpanStoreWriteCacheTTL, f.primaryMetricsFactory, f.logger), nil
108+
writer := cSpanStore.NewSpanWriter(f.primarySession, f.Options.SpanStoreWriteCacheTTL, f.primaryMetricsFactory, f.logger)
109+
if f.Options.writerRateLimit.operationsPerSecond == 0 && f.Options.writerRateLimit.maxBurst == 0 {
110+
return writer, nil
111+
}
112+
113+
rateLimiter, err := utils.NewRateLimiter(
114+
f.Options.writerRateLimit.operationsPerSecond,
115+
f.Options.writerRateLimit.maxBurst,
116+
)
117+
if err != nil {
118+
return nil, err
119+
}
120+
return newRateLimitedSpanWriter(writer, rateLimiter), nil
121+
}
122+
123+
func newRateLimitedSpanWriter(writer spanstore.Writer, rateLimiter utils.RateLimiter) spanstore.Writer {
124+
return &rateLimitedSpanWriter{
125+
writer: writer,
126+
rateLimiter: rateLimiter,
127+
}
128+
}
129+
130+
type rateLimitedSpanWriter struct {
131+
writer spanstore.Writer
132+
rateLimiter utils.RateLimiter
133+
io.Closer
134+
}
135+
136+
func (w *rateLimitedSpanWriter) WriteSpan(span *model.Span) error {
137+
const cost = 1.0
138+
ok, waitTime := w.rateLimiter.CheckCredit(cost)
139+
for !ok {
140+
time.Sleep(waitTime)
141+
ok, waitTime = w.rateLimiter.CheckCredit(cost)
142+
}
143+
return w.writer.WriteSpan(span)
105144
}
106145

107146
// CreateDependencyReader implements storage.Factory

plugin/storage/cassandra/options.go

+17
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ const (
4848

4949
// common storage settings
5050
suffixSpanStoreWriteCacheTTL = ".span-store-write-cache-ttl"
51+
suffixWritesPerSecond = ".writer-rate-limit.operations-per-second"
52+
suffixMaxBurstWrites = ".writer-rate-limit.max-burst"
5153
)
5254

5355
// Options contains various type of Cassandra configs and provides the ability
@@ -57,6 +59,7 @@ type Options struct {
5759
primary *namespaceConfig
5860
others map[string]*namespaceConfig
5961
SpanStoreWriteCacheTTL time.Duration
62+
writerRateLimit rateLimitConfig
6063
}
6164

6265
// the Servers field in config.Configuration is a list, which we cannot represent with flags.
@@ -70,6 +73,12 @@ type namespaceConfig struct {
7073
Enabled bool
7174
}
7275

76+
// rateLimitConfig defines common arguments to a rate limiter.
77+
type rateLimitConfig struct {
78+
operationsPerSecond float64
79+
maxBurst float64
80+
}
81+
7382
// NewOptions creates a new Options struct.
7483
func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options {
7584
// TODO all default values should be defined via cobra flags
@@ -111,6 +120,12 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
111120
flagSet.Duration(opt.primary.namespace+suffixSpanStoreWriteCacheTTL,
112121
opt.SpanStoreWriteCacheTTL,
113122
"The duration to wait before rewriting an existing service or operation name")
123+
flagSet.Float64(opt.primary.namespace+suffixWritesPerSecond,
124+
opt.writerRateLimit.operationsPerSecond,
125+
"The number of writes per second using rate limiter")
126+
flagSet.Float64(opt.primary.namespace+suffixMaxBurstWrites,
127+
opt.writerRateLimit.maxBurst,
128+
"The maximum number of writes in a single burst using rate limiter")
114129
}
115130

116131
func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
@@ -201,6 +216,8 @@ func (opt *Options) InitFromViper(v *viper.Viper) {
201216
cfg.initFromViper(v)
202217
}
203218
opt.SpanStoreWriteCacheTTL = v.GetDuration(opt.primary.namespace + suffixSpanStoreWriteCacheTTL)
219+
opt.writerRateLimit.operationsPerSecond = v.GetFloat64(opt.primary.namespace + suffixWritesPerSecond)
220+
opt.writerRateLimit.maxBurst = v.GetFloat64(opt.primary.namespace + suffixMaxBurstWrites)
204221
}
205222

206223
func (cfg *namespaceConfig) initFromViper(v *viper.Viper) {

plugin/storage/cassandra/options_test.go

+5
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ func TestOptionsWithFlags(t *testing.T) {
5858
"--cas.consistency=ONE",
5959
"--cas.proto-version=3",
6060
"--cas.socket-keep-alive=42s",
61+
"--cas.writer-rate-limit.operations-per-second=10",
62+
"--cas.writer-rate-limit.max-burst=1",
6163
// enable aux with a couple overrides
6264
"--cas-aux.enabled=true",
6365
"--cas-aux.keyspace=jaeger-archive",
@@ -82,4 +84,7 @@ func TestOptionsWithFlags(t *testing.T) {
8284
assert.Equal(t, "", aux.Consistency, "aux storage does not inherit consistency from primary")
8385
assert.Equal(t, 3, aux.ProtoVersion)
8486
assert.Equal(t, 42*time.Second, aux.SocketKeepAlive)
87+
88+
assert.Equal(t, 10.0, opts.writerRateLimit.operationsPerSecond)
89+
assert.Equal(t, 1.0, opts.writerRateLimit.maxBurst)
8590
}

0 commit comments

Comments
 (0)