Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Benchmarking Cache and WorkerPool implementations #7

Closed
wants to merge 15 commits into from
Closed
9 changes: 4 additions & 5 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"

"github.com/mailgun/holster/v4/clock"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand All @@ -35,8 +34,8 @@ import (

// Implements token bucket algorithm for rate limiting. https://en.wikipedia.org/wiki/Token_bucket
func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqState RateLimitReqState) (resp *RateLimitResp, err error) {
tokenBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("tokenBucket"))
defer tokenBucketTimer.ObserveDuration()
//tokenBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("tokenBucket"))
//defer tokenBucketTimer.ObserveDuration()

// Get rate limit from cache.
hashKey := r.HashKey()
Expand Down Expand Up @@ -258,8 +257,8 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq,

// Implements leaky bucket algorithm for rate limiting https://en.wikipedia.org/wiki/Leaky_bucket
func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqState RateLimitReqState) (resp *RateLimitResp, err error) {
leakyBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.getRateLimit_leakyBucket"))
defer leakyBucketTimer.ObserveDuration()
//leakyBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.getRateLimit_leakyBucket"))
//defer leakyBucketTimer.ObserveDuration()

if r.Burst == 0 {
r.Burst = r.Limit
Expand Down
36 changes: 36 additions & 0 deletions bench/access.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package bench

import (
"fmt"
"math/rand"
"testing"
)

func AccessStructure(b *testing.B, size int) {
var indexes = make([]int, size, size)
var arr = make([]int, size, size)
var hash = make(map[int]int)

//rand.Seed(size % 42)
for i := 0; i < size; i++ {
indexes[i] = rand.Intn(size)
arr[i] = i
hash[i] = i
}

b.ResetTimer()

b.Run(fmt.Sprintf("Array_%d", size), func(b *testing.B) {
for i := 0; i < b.N; i++ {
indx := indexes[i%size] % size
_ = arr[indx]
}
})

b.Run(fmt.Sprintf("Hash_%d", size), func(b *testing.B) {
for i := 0; i < b.N; i++ {
indx := indexes[i%size] % size
_ = hash[indx]
}
})
}
140 changes: 140 additions & 0 deletions bench/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package bench_test

import (
"fmt"
"runtime"
"testing"
"time"

"github.com/gubernator-io/gubernator/v2/bench"
)

//func BenchmarkAccessStructure(b *testing.B) {
// for _, size := range []int{1, 10, 100, 1000, 10000, 100000, 1000000} {
// bench.AccessStructure(b, size)
// }
//}

/*
BenchmarkWorkerPool/Write_100-10 373 380416 ns/op
BenchmarkParallel/NoCache_1-10 4955510 246.4 ns/op 4058316 ops/s
*/

func BenchmarkParallel(b *testing.B) {
benchCase := []int{1, 2, 4, 8, 12, 16, 20, 24, 28, 32}

fmt.Printf("Current Operating System has '%d' CPUs\n", runtime.NumCPU())
for _, p := range benchCase {
b.Run(fmt.Sprintf("Hashi_%d", p), func(b *testing.B) {
bench.HashiReadParallel(b, p)
})
}
for _, p := range benchCase {
b.Run(fmt.Sprintf("Mutex_%d", p), func(b *testing.B) {
bench.MutexReadParallel(b, p)
})
}

k := bench.GenerateRandomKeys()
c := time.Now().UnixNano() / 1_000_000
w, err := bench.OtterPreLoad(b.Name(), k)
if err != nil {
b.Fatal(err)
}
defer w.Close()

for _, p := range benchCase {
b.Run(fmt.Sprintf("OtterRead_%d", p), func(b *testing.B) {
bench.OtterReadParallel(b, p, w, &c, k)
})
}

for _, p := range benchCase {
b.Run(fmt.Sprintf("OtterWrite_%d", p), func(b *testing.B) {
bench.OtterWriteParallel(b, p)
})
}
for _, p := range benchCase {
b.Run(fmt.Sprintf("WorkerPool_%d", p), func(b *testing.B) {
bench.WorkerPoolReadParallel(b, p)
})
}
}

//func BenchmarkConcurrency(b *testing.B) {
// fmt.Printf("Current Operating System has '%d' CPUs\n", runtime.NumCPU())
// runtime.GOMAXPROCS(runtime.NumCPU())
//
// for _, con := range []int{1, 10, 100, 1000, 5_000, 10_000, 15_000, 20_000} {
// b.Run(fmt.Sprintf("NoCache_%d", con), func(b *testing.B) {
// bench.NoCache(b, con)
// })
// }
// for _, con := range []int{1, 10, 100, 1000, 5_000, 10_000, 15_000, 20_000} {
// b.Run(fmt.Sprintf("OtterWrite_%d", con), func(b *testing.B) {
// bench.OtterWrite(b, con)
// })
// }
// for _, con := range []int{1, 10, 100, 1000, 5_000, 10_000, 15_000, 20_000} {
// b.Run(fmt.Sprintf("OtterRead_%d", con), func(b *testing.B) {
// bench.OtterRead(b, con)
// })
// }
// for _, con := range []int{1, 10, 100, 1000, 5_000, 10_000, 15_000, 20_000} {
// b.Run(fmt.Sprintf("MutexRead_%d", con), func(b *testing.B) {
// bench.MutexRead(b, con)
// })
// }
// for _, con := range []int{1, 10, 100, 1000, 5_000, 10_000, 15_000, 20_000} {
// b.Run(fmt.Sprintf("MutexWrite_%d", con), func(b *testing.B) {
// bench.MutexWrite(b, con)
// })
// }
// for _, con := range []int{1, 10, 100, 1000, 5_000, 10_000, 15_000, 20_000} {
// b.Run(fmt.Sprintf("WorkerPoolRead_%d", con), func(b *testing.B) {
// bench.WorkerPoolRead(b, con)
// })
// }
// for _, con := range []int{1, 10, 100, 1000, 5_000, 10_000, 15_000, 20_000} {
// b.Run(fmt.Sprintf("WorkerPoolWrite_%d", con), func(b *testing.B) {
// bench.WorkerPoolWrite(b, con)
// })
// }
//}

// This is baseline comparison to a similar test in /benchmark_test.go called 'BenchmarkServer/Thundering_herd-10'
//func BenchmarkThunderingHerd(b *testing.B) {
// // Ensure the size of the data in the pool is consistent throughout all concurrency levels
// const size = 1_000_000
// p := gubernator.NewWorkerPool(&gubernator.Config{
// CacheFactory: func(maxSize int) gubernator.Cache {
// return gubernator.NewLRUCache(maxSize)
// },
// CacheSize: size * 1_000_000,
// Workers: runtime.NumCPU(),
// Logger: logrus.New(),
// })
// ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
// defer cancel()
//
// createdAt := time.Now().UnixNano() / 1_000_000
//
// b.ResetTimer()
// fan := syncutil.NewFanOut(100)
//
// for n := 0; n < b.N; n++ {
// fan.Run(func(o interface{}) error {
// _, err := p.GetRateLimit(ctx, &gubernator.RateLimitReq{
// CreatedAt: &createdAt,
// UniqueKey: gubernator.RandomString(10),
// Name: b.Name(),
// }, gubernator.RateLimitReqState{})
// if err != nil {
// b.Errorf("Error in client.GetRateLimits: %s", err)
// }
// return nil
// }, nil)
// }
//
// fan.Wait()
//}
74 changes: 74 additions & 0 deletions bench/hashi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package bench

import (
"context"
"math/rand"
"runtime"
"testing"
"time"

"github.com/gubernator-io/gubernator/v2"
)

func HashiReadParallel(b *testing.B, processors int) {
runtime.GOMAXPROCS(processors)

l := &MockLoader{}
p := gubernator.NewWorkerHashi(gubernator.Config{
CacheFactory: func(maxSize int) gubernator.Cache {
return gubernator.NewHashiCache(maxSize)
},
CacheSize: cacheSize * 1_000_000,
Loader: l,
})
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

// Prefill the cache
createdAt := time.Now().UnixNano() / 1_000_000
keys := GenerateRandomKeys()
for _, k := range keys {
_, err := p.GetRateLimit(ctx, &gubernator.RateLimitReq{
CreatedAt: &createdAt,
Name: b.Name(),
Duration: 100_000,
UniqueKey: k,
}, gubernator.RateLimitReqState{})
if err != nil {
b.Fatal(err)
}
}

if err := p.Store(ctx); err != nil {
b.Fatal(err)
}

if l.Count != cacheSize {
b.Fatal("item count in pool does not match expected size")
}

mask := len(keys) - 1
start := time.Now()
b.ResetTimer()

b.RunParallel(func(pb *testing.PB) {
index := int(rand.Uint32() & uint32(mask))

for pb.Next() {
_, err := p.GetRateLimit(ctx, &gubernator.RateLimitReq{
CreatedAt: &createdAt,
UniqueKey: keys[index&mask],
Duration: 100_000,
Name: b.Name(),
}, gubernator.RateLimitReqState{})
index++
if err != nil {
b.Error(err)
return
}
}

})
opsPerSec := float64(b.N) / time.Since(start).Seconds()
b.ReportMetric(opsPerSec, "ops/s")
}
18 changes: 18 additions & 0 deletions bench/mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package bench

import "github.com/gubernator-io/gubernator/v2"

type MockLoader struct {
Count int
}

func (l *MockLoader) Load() (chan *gubernator.CacheItem, error) {
panic("implement me")
}

func (l *MockLoader) Save(items chan *gubernator.CacheItem) error {
for range items {
l.Count++
}
return nil
}
Loading