Skip to content

Commit

Permalink
First benchmark test
Browse files Browse the repository at this point in the history
  • Loading branch information
itaiad200 committed Sep 24, 2020
1 parent 4434d27 commit 328a3be
Show file tree
Hide file tree
Showing 6 changed files with 370 additions and 110 deletions.
4 changes: 2 additions & 2 deletions .env
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
TAG=latest
LAKEFS_BLOCKSTORE_TYPE=local
TAG=0.10.2
LAKEFS_BLOCKSTORE_TYPE=local
236 changes: 236 additions & 0 deletions benchmarks/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package benchmarks

import (
"bytes"
"context"
"fmt"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/go-openapi/runtime"
"github.com/go-openapi/swag"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/prom2json"
"github.com/spf13/viper"
"github.com/stretchr/testify/require"
"github.com/thanhpk/randstr"
genclient "github.com/treeverse/lakefs/api/gen/client"
"github.com/treeverse/lakefs/api/gen/client/objects"
"github.com/treeverse/lakefs/api/gen/client/repositories"
"github.com/treeverse/lakefs/api/gen/models"
"github.com/treeverse/lakefs/logging"
"github.com/treeverse/lakefs/testutil"
"net/http"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)

var (
logger logging.Logger
client *genclient.Lakefs
svc *s3.S3
)

func TestMain(m *testing.M) {
//benchmarkTests := flag.Bool("benchmark-tests", false, "Run benchmark tests")
//flag.Parse()
//if !*benchmarkTests {
// os.Exit(0)
//}

viper.SetDefault("parallelism_level", 500)
viper.SetDefault("files_amount", 10000)
viper.SetDefault("global_timeout", 30*time.Minute)

logger, client, svc = testutil.SetupTestingEnv("benchmark", "lakefs-benchmarking")
logger.Info("Setup succeeded, running the tests")

if code := m.Run(); code != 0 {
logger.Info("Tests run failed")
os.Exit(code)
}

scrapePrometheus()
}

var monitoredOps = map[string]bool{
"getObject": true,
"uploadObject": true,
}

func scrapePrometheus() {
lakefsEndpoint := viper.GetString("endpoint_url")
resp, err := http.DefaultClient.Get(lakefsEndpoint + "/metrics")
if err != nil {
panic(err)
}

ch := make(chan *dto.MetricFamily)
go func() { _ = prom2json.ParseResponse(resp, ch) }()
metrics := []*dto.Metric{}

for {
a, ok := <-ch
if !ok {
break
}

if *a.Name == "api_request_duration_seconds" {
for _, m := range a.Metric {
for _, label := range m.Label {
if *label.Name == "operation" && monitoredOps[*label.Value] {
metrics = append(metrics, m)
}
}
}
}
}

for _, m := range metrics {
fmt.Printf("%v\n", *m)
}
}

const (
contentSuffixLength = 32
//contentLength = 128 * 1024
contentLength = 1 * 1024
)

func TestBenchmarkLakeFS(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), viper.GetDuration("global_timeout"))
defer cancel()

ns := viper.GetString("storage_namespace")
repoName := strings.ToLower(t.Name())
logger.WithFields(logging.Fields{
"repository": repoName,
"storage_namespace": ns,
"name": repoName,
}).Debug("Create repository for test")
_, err := client.Repositories.CreateRepository(repositories.NewCreateRepositoryParamsWithContext(ctx).
WithRepository(&models.RepositoryCreation{
DefaultBranch: "master",
ID: swag.String(repoName),
StorageNamespace: swag.String(ns),
}), nil)
require.NoErrorf(t, err, "failed to create repository '%s', storage '%s'", t.Name(), ns)

parallelism := viper.GetInt("parallelism_level")
filesAmount := viper.GetInt("files_amount")

contentPrefix := randstr.Hex(contentLength - contentSuffixLength)
failed := doInParallel(ctx, repoName, parallelism, filesAmount, contentPrefix, uploader)
logger.WithField("failedCount", failed).Info("Finished uploading files")

failed = doInParallel(ctx, repoName, parallelism, filesAmount, "", reader)
logger.WithField("failedCount", failed).Info("Finished reading files")

}

func doInParallel(ctx context.Context, repoName string, level, filesAmount int, contentPrefix string, do func(context.Context, chan string, string, string) int) int {
filesCh := make(chan string, level)
wg := sync.WaitGroup{}
var failed int64

for i := 0; i < level; i++ {
go func() {
wg.Add(1)
fail := do(ctx, filesCh, repoName, contentPrefix)
atomic.AddInt64(&failed, int64(fail))
wg.Done()
}()
}

for i := 1; i <= filesAmount; i++ {
filesCh <- strconv.Itoa(i)
}

close(filesCh)
wg.Wait()

return int(failed)
}

func uploader(ctx context.Context, ch chan string, repoName, contentPrefix string) int {
failed := 0
for {
select {
case <-ctx.Done():
return failed
case file, ok := <-ch:
if !ok {
// channel closed
return failed
}

// Making sure content isn't duplicated to avoid dedup mechanisms in lakeFS
content := contentPrefix + randstr.Hex(contentSuffixLength)
contentReader := runtime.NamedReader("content", strings.NewReader(content))

if err := linearRetry(func() error {
_, err := client.Objects.UploadObject(
objects.NewUploadObjectParamsWithContext(ctx).
WithRepository(repoName).
WithBranch("master").
WithPath(file).
WithContent(contentReader), nil)
return err
}); err != nil {
failed++
logger.WithField("fileNum", file).Error("Failed uploading file")
}
}
}
}

func reader(ctx context.Context, ch chan string, repoName, _ string) int {
failed := 0
for {
select {
case <-ctx.Done():
return failed
case file, ok := <-ch:
if !ok {
// channel closed
return failed
}

if err := linearRetry(func() error {
var b bytes.Buffer
_, err := client.Objects.GetObject(
objects.NewGetObjectParamsWithContext(ctx).
WithRepository(repoName).
WithRef("master").
WithPath(file), nil, &b)
return err
}); err != nil {
failed++
logger.WithField("fileNum", file).Error("Failed reading file")
}
}
}
}

const (
tries = 3
retryTimeout = 200 * time.Millisecond
)

func linearRetry(do func() error) error {
var err error
for i := 1; i <= tries; i++ {
if err = do(); err == nil {
return nil
}

if i != tries {
// skip sleep in the last iteration
time.Sleep(retryTimeout)
}
}
return err
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/ory/dockertest/v3 v3.6.0
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.11.1 // indirect
github.com/prometheus/prom2json v1.3.0
github.com/rakyll/statik v0.1.7
github.com/rs/xid v1.2.1
github.com/schollz/progressbar/v3 v3.3.4
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@ github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/alecthomas/jsonschema v0.0.0-20180308105923-f2c93856175a/go.mod h1:qpebaTNSsyUn5rPSJMsfqEtDw71TTggXM6stUDI16HA=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
github.com/apache/arrow/go/arrow v0.0.0-20200601151325-b2287a20f230/go.mod h1:QNYViu/X0HXDHw7m3KXzWSVXIbfUvJqBFe6Gj8/pYA0=
Expand Down Expand Up @@ -923,6 +925,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/prom2json v1.3.0 h1:BlqrtbT9lLH3ZsOVhXPsHzFrApCTKRifB7gjJuypu6Y=
github.com/prometheus/prom2json v1.3.0/go.mod h1:rMN7m0ApCowcoDlypBHlkNbp5eJQf/+1isKykIP5ZnM=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/quasilyte/go-consistent v0.0.0-20190521200055-c6f3937de18c/go.mod h1:5STLWrekHfjyYwxBRVRXNOSewLJ3PWfDJd1VyTS21fI=
github.com/quasilyte/go-ruleguard v0.1.2-0.20200318202121-b00d7a75d3d8 h1:DvnesvLtRPQOvaUbfXfh0tpMHg29by0H7F2U+QIkSu8=
Expand Down Expand Up @@ -1536,6 +1540,7 @@ google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGj
google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
Loading

0 comments on commit 328a3be

Please sign in to comment.