Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: experiments with throttle cache #621

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,36 @@
## Changelog
##### master
* bump google.golang.org/grpc to fix vulnerability GHSA-m425-mq94-257g by @KacperLegowski in https://github.com/go-graphite/go-carbon/pull/574
* Actions bump and fpm fix by @deniszh in https://github.com/go-graphite/go-carbon/pull/580
* Add dependabot config by @RincewindsHat in https://github.com/go-graphite/go-carbon/pull/583
* Remove old otel dependency, upgrade deps by @deniszh in https://github.com/go-graphite/go-carbon/pull/586
* Adding HTTP GET handler for health check by @deniszh in https://github.com/go-graphite/go-carbon/pull/588
* Using cuckoo filter for new metric detection instead of cache by @deniszh in https://github.com/go-graphite/go-carbon/pull/590
* Let's not delete values from boom filter by @deniszh in https://github.com/go-graphite/go-carbon/pull/593
* fix: panic on slice bounds out of range when preparing data stream by @dowster in https://github.com/go-graphite/go-carbon/pull/599
* Speed up fetchData by @deniszh in https://github.com/go-graphite/go-carbon/pull/601
* Make throughput quota config per minute by @emadolsky in https://github.com/go-graphite/go-carbon/pull/612

###### dependabot updates
* Bump github/codeql-action from 2 to 3 by @dependabot in https://github.com/go-graphite/go-carbon/pull/584
* Bump golangci/golangci-lint-action from 4 to 5 by @dependabot in https://github.com/go-graphite/go-carbon/pull/585
* Bump golangci/golangci-lint-action from 5 to 6 by @dependabot in https://github.com/go-graphite/go-carbon/pull/587
* Bump docker/build-push-action from 5 to 6 by @dependabot in https://github.com/go-graphite/go-carbon/pull/598
* Bump google.golang.org/grpc from 1.64.0 to 1.64.1 by @dependabot in https://github.com/go-graphite/go-carbon/pull/600
* Bump google.golang.org/grpc from 1.64.1 to 1.65.0 by @dependabot in https://github.com/go-graphite/go-carbon/pull/602
* Bump github.com/BurntSushi/toml from 1.3.2 to 1.4.0 by @dependabot in https://github.com/go-graphite/go-carbon/pull/603
* Bump github.com/klauspost/compress from 1.17.8 to 1.17.9 by @dependabot in https://github.com/go-graphite/go-carbon/pull/604
* Bump google.golang.org/protobuf from 1.34.1 to 1.34.2 by @dependabot in https://github.com/go-graphite/go-carbon/pull/605
* Bump cloud.google.com/go/pubsub from 1.38.0 to 1.40.0 by @dependabot in https://github.com/go-graphite/go-carbon/pull/606
* Bump golang.org/x/net from 0.26.0 to 0.27.0 by @dependabot in https://github.com/go-graphite/go-carbon/pull/607
* Bump google.golang.org/api from 0.181.0 to 0.188.0 by @dependabot in https://github.com/go-graphite/go-carbon/pull/608
* Bump google.golang.org/api from 0.188.0 to 0.190.0 by @dependabot in https://github.com/go-graphite/go-carbon/pull/610
* Bump cloud.google.com/go/pubsub from 1.40.0 to 1.41.0 by @dependabot in https://github.com/go-graphite/go-carbon/pull/611
* Bump google.golang.org/api from 0.190.0 to 0.191.0 by @dependabot in https://github.com/go-graphite/go-carbon/pull/613
* Bump golang.org/x/net from 0.27.0 to 0.28.0 by @dependabot in https://github.com/go-graphite/go-carbon/pull/614
* Bump github.com/prometheus/client_golang from 1.19.1 to 1.20.0 by @dependabot in https://github.com/go-graphite/go-carbon/pull/616
* Bump google.golang.org/api from 0.191.0 to 0.192.0 by @dependabot in https://github.com/go-graphite/go-carbon/pull/617
* Bump github.com/IBM/sarama from 1.43.2 to 1.43.3 by @dependabot in https://github.com/go-graphite/go-carbon/pull/618

##### version 0.17.3
* Bump golang.org/x/net from 0.7.0 to 0.17.0 by @dependabot in #568
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ max-size = 1000000
# "noop" - pick metrics to write in unspecified order,
# requires least CPU and improves cache responsiveness
write-strategy = "max"
# If > 0 use bloom filter to detect new metrics instead of cache
# If > 0 use bloom filter to detect new metrics instead of cache (EXPERIMENTAL)
# works better for multi-million metrics installations
bloom-size = 0

[udp]
Expand Down
27 changes: 19 additions & 8 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import (
"sync/atomic"
"time"

cuckoo "github.com/seiflotfy/cuckoofilter"
"github.com/cespare/xxhash/v2"

"github.com/go-graphite/go-carbon/helper"
"github.com/go-graphite/go-carbon/points"
"github.com/go-graphite/go-carbon/tags"
"github.com/greatroar/blobloom"
)

type WriteStrategy int
Expand Down Expand Up @@ -60,8 +61,9 @@ type Cache struct {
droppedRealtimeIndex uint32 // new metrics failed to be indexed in realtime
}

newMetricsChan chan string
newMetricCf *cuckoo.Filter
newMetricsChan chan string
newMetricCf *blobloom.Filter
newMetricCfCapacity uint64

throttle func(ps *points.Points, inCache bool) bool
}
Expand Down Expand Up @@ -135,9 +137,13 @@ func (c *Cache) SetMaxSize(maxSize uint32) {
}

// SetBloomSize of bloom filter
func (c *Cache) SetBloomSize(bloomSize uint) {
func (c *Cache) SetBloomSize(bloomSize uint64) {
if bloomSize > 0 {
c.newMetricCf = cuckoo.NewFilter(bloomSize)
c.newMetricCf = blobloom.NewOptimized(blobloom.Config{
Capacity: bloomSize, // Expected number of keys.
FPRate: 1e-4, // Accept one false positive per 10,000 lookups.
})
c.newMetricCfCapacity = bloomSize
}
}

Expand All @@ -162,7 +168,12 @@ func (c *Cache) Stat(send helper.StatCallback) {
send("notConfirmed", float64(c.NotConfirmedLength()))
// report elements in bloom filter
if c.newMetricCf != nil {
send("cfCount", float64(c.newMetricCf.Count()))
cfCount := c.newMetricCf.Cardinality()
if uint64(cfCount) > c.newMetricCfCapacity {
// full filter report +Inf cardinality
cfCount = float64(c.newMetricCfCapacity)
}
send("cfCount", cfCount)
}

helper.SendAndSubstractUint32("queries", &c.stat.queryCnt, send)
Expand Down Expand Up @@ -336,10 +347,10 @@ func (c *Cache) Add(p *points.Points) {
if c.newMetricsChan != nil && c.newMetricCf != nil {
// add metric to new metric channel if missed in bloom
// despite what we have it in cache (new behaviour)
if !c.newMetricCf.Lookup([]byte(p.Metric)) {
if !c.newMetricCf.Has(xxhash.Sum64([]byte(p.Metric))) {
sendMetricToNewMetricChan(c, p.Metric)
}
c.newMetricCf.Insert([]byte(p.Metric))
c.newMetricCf.Add(xxhash.Sum64([]byte(p.Metric)))
}
atomic.AddInt32(&c.stat.size, int32(count))
}
Expand Down
2 changes: 1 addition & 1 deletion cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestCache(t *testing.T) {
}

// check if new metric added to bloom filter
if c.newMetricCf.Count() != 1 {
if c.newMetricCf.Empty() {
t.FailNow()
}

Expand Down
2 changes: 2 additions & 0 deletions carbon/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,8 @@ func (app *App) Start() (err error) {
})

carbonserver.SetQuotas(app.Config.getCarbonserverQuotas(conf.Carbonserver.QuotaUsageReportFrequency.Value()))
// TODO: make it configurable
carbonserver.SetThrottleBloomCache(10*1000*1000, time.Duration(150*time.Second))
core.SetThrottle(carbonserver.ShouldThrottleMetric)
}

Expand Down
2 changes: 1 addition & 1 deletion carbon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type whisperConfig struct {
type cacheConfig struct {
MaxSize uint32 `toml:"max-size"`
WriteStrategy string `toml:"write-strategy"`
BloomSize uint `toml:"bloom-size"`
BloomSize uint64 `toml:"bloom-size"`
}

type carbonlinkConfig struct {
Expand Down
80 changes: 80 additions & 0 deletions carbonserver/carbonserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/cespare/xxhash/v2"
"io"
"math"
"net"
Expand Down Expand Up @@ -58,6 +59,7 @@ import (
"github.com/go-graphite/go-carbon/points"
grpcv2 "github.com/go-graphite/protocol/carbonapi_v2_grpc"
protov3 "github.com/go-graphite/protocol/carbonapi_v3_pb"
"github.com/greatroar/blobloom"
"github.com/lomik/zapwriter"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/filter"
Expand Down Expand Up @@ -104,6 +106,8 @@ type metricStruct struct {
TrieFiles uint64
TrieDirs uint64
TrieCountNodesTimeNs uint64
throttleBloomACardinality uint64
throttleBloomBCardinality uint64
QuotaApplyTimeNs uint64
UsageRefreshTimeNs uint64

Expand Down Expand Up @@ -287,6 +291,12 @@ type CarbonserverListener struct {
quotaAndUsageMetrics chan []points.Points
quotaUsageReportFrequency time.Duration
maxCreatesPerSecond int
throttleBloomA *blobloom.Filter
throttleBloomB *blobloom.Filter
throttleBloomSize uint64
throttleBloomTTL time.Duration
timeToResetBloomA time.Time
timeToResetBloomB time.Time

interfalInfoCallbacks map[string]func() map[string]interface{}

Expand Down Expand Up @@ -499,6 +509,12 @@ func NewCarbonserverListener(cacheGetFunc func(key string) []points.Point) *Carb
quotaAndUsageMetrics: make(chan []points.Points, 1),
apiPerPathRatelimiter: map[string]*ApiPerPathRatelimiter{},
fileListCacheVersion: FLCVersion1,
throttleBloomA: nil,
throttleBloomB: nil,
throttleBloomSize: 0,
throttleBloomTTL: time.Duration(0),
timeToResetBloomA: time.Now(),
timeToResetBloomB: time.Now(),
}
}

Expand Down Expand Up @@ -615,13 +631,75 @@ func (listener *CarbonserverListener) SetQuotas(quotas []*Quota) {
func (listener *CarbonserverListener) isQuotaEnabled() bool {
return listener.quotas != nil
}

func (listener *CarbonserverListener) SetThrottleBloomCache(bloomSize uint64, ttl time.Duration) {
if bloomSize > 0 && ttl > 0 {
c := blobloom.Config{
Capacity: bloomSize, // Expected number of keys.
FPRate: 1e-4, // Accept one false positive per 10,000 lookups.
}
listener.throttleBloomSize = bloomSize
listener.throttleBloomA = blobloom.NewOptimized(c)
listener.throttleBloomB = blobloom.NewOptimized(c)
listener.throttleBloomTTL = ttl
listener.timeToResetBloomA = time.Now().Add(ttl / 2)
listener.timeToResetBloomB = time.Now().Add(ttl)
}
}

func (listener *CarbonserverListener) checkMetricNotThrottledCache(hash uint64) bool {
if listener.throttleBloomA != nil && listener.throttleBloomB != nil {
if listener.throttleBloomA.Has(hash) || listener.throttleBloomB.Has(hash) {
return true
}
}
return false
}

func (listener *CarbonserverListener) setMetricNotThrottledCache(hash uint64) {
if listener.throttleBloomA != nil {
if time.Now().After(listener.timeToResetBloomA) {
// record cardinality before resetting bloom
c := uint64(listener.throttleBloomA.Cardinality())
if c > listener.throttleBloomSize {
c = listener.throttleBloomSize
}
atomic.StoreUint64(&listener.metrics.throttleBloomACardinality, c)
listener.timeToResetBloomA = time.Now().Add(listener.throttleBloomTTL)
listener.throttleBloomA.Clear()
}
listener.throttleBloomA.Add(hash)
}
if listener.throttleBloomB != nil {
if time.Now().After(listener.timeToResetBloomB) {
// record cardinality before resetting bloom
c := uint64(listener.throttleBloomB.Cardinality())
if c > listener.throttleBloomSize {
c = listener.throttleBloomSize
}
atomic.StoreUint64(&listener.metrics.throttleBloomBCardinality, c)
listener.throttleBloomB.Clear()
listener.timeToResetBloomB = time.Now().Add(listener.throttleBloomTTL)
}
listener.throttleBloomB.Add(hash)
}
}

func (listener *CarbonserverListener) ShouldThrottleMetric(ps *points.Points, inCache bool) bool {
fidx := listener.CurrentFileIndex()
if fidx == nil || fidx.trieIdx == nil {
return false
}

hash := xxhash.Sum64([]byte(ps.Metric))
if listener.checkMetricNotThrottledCache(hash) {
return false
}

var throttled = fidx.trieIdx.throttle(ps, inCache)
if !throttled {
listener.setMetricNotThrottledCache(hash)
}

return throttled
}
Expand Down Expand Up @@ -1541,6 +1619,8 @@ func (listener *CarbonserverListener) Stat(send helper.StatCallback) {
if listener.isQuotaEnabled() {
senderRaw("quota_apply_time_ns", &listener.metrics.QuotaApplyTimeNs, send)
senderRaw("usage_refresh_time_ns", &listener.metrics.UsageRefreshTimeNs, send)
senderRaw("throttle_bloom_a_cardinality", &listener.metrics.throttleBloomACardinality, send)
senderRaw("throttle_bloom_b_cardinality", &listener.metrics.throttleBloomBCardinality, send)
}

sender("alloc", &alloc, send)
Expand Down
3 changes: 3 additions & 0 deletions go-carbon.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ max-size = 1000000
# "noop" - pick metrics to write in unspecified order,
# requires least CPU and improves cache responsiveness
write-strategy = "max"
# If > 0 use bloom filter to detect new metrics instead of cache (EXPERIMENTAL)
# works better for multi-million metrics installations
bloom-size = 0

[udp]
listen = ":2003"
Expand Down
7 changes: 3 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/go-graphite/go-carbon

go 1.20
go 1.21

require (
cloud.google.com/go/pubsub v1.41.0
Expand Down Expand Up @@ -34,7 +34,8 @@ require (
)

require (
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb
github.com/cespare/xxhash/v2 v2.3.0
github.com/greatroar/blobloom v0.8.0
golang.org/x/net v0.28.0
google.golang.org/protobuf v1.34.2
)
Expand All @@ -46,9 +47,7 @@ require (
cloud.google.com/go/compute/metadata v0.5.0 // indirect
cloud.google.com/go/iam v1.1.12 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-metro v0.0.0-20211217172704-adc40b04c140 // indirect
github.com/eapache/go-resiliency v1.7.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
Expand Down
Loading
Loading