Skip to content

Commit

Permalink
exp: jobsdb bench
Browse files Browse the repository at this point in the history
  • Loading branch information
lvrach committed Oct 22, 2024
1 parent da9f8c8 commit 85befe3
Show file tree
Hide file tree
Showing 4 changed files with 486 additions and 2 deletions.
1 change: 1 addition & 0 deletions jobsdb/cmd/bench/init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
333 changes: 333 additions & 0 deletions jobsdb/cmd/bench/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,333 @@
package main

import (
"context"
"database/sql"
"flag"
"fmt"
"log"
"math/rand"
"os"
"os/signal"
"sync/atomic"
"syscall"
"time"

"golang.org/x/sync/errgroup"

"github.com/google/uuid"
_ "github.com/lib/pq" // PostgreSQL driver
"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
"github.com/phayes/freeport"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/stats"

_ "embed"
"encoding/json"

"github.com/rudderlabs/rudder-server/jobsdb"

"bufio"
)

var (
duration time.Duration
batchSize int
eventCounter int64 // Atomic counter for total events processed
writeWorkers int // Number of write workers
binaryPayload bool
compression bool
postgresVersion string // New variable for PostgreSQL version
)

//go:embed init.sql
var initSQL string

//go:embed payload.json
var payloadJSON []byte

var eventPayload []byte

func init() {
flag.DurationVar(&duration, "duration", 1*time.Minute, "Duration to run the test")
flag.IntVar(&batchSize, "batch", 10000, "Batch size for storing and reading events")
flag.IntVar(&writeWorkers, "writers", 2, "Number of write workers")
flag.BoolVar(&binaryPayload, "binary", false, "Use binary payload")
flag.BoolVar(&compression, "compression", false, "Enable payload compression")
flag.StringVar(&postgresVersion, "postgres-version", "15", "PostgreSQL version to use")

var err error
eventPayload, err = json.Marshal(json.RawMessage(payloadJSON))
if err != nil {
panic(err)

Check warning on line 63 in jobsdb/cmd/bench/main.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/main.go#L52-L63

Added lines #L52 - L63 were not covered by tests
}
}

type SimpleCleaner struct{}

func (sc *SimpleCleaner) Cleanup(f func()) { f() }
func (sc *SimpleCleaner) Log(args ...interface{}) {}
func (sc *SimpleCleaner) Logf(format string, args ...interface{}) {}
func (sc *SimpleCleaner) Failed() bool { return false }

Check warning on line 72 in jobsdb/cmd/bench/main.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/main.go#L69-L72

Added lines #L69 - L72 were not covered by tests

func generateRandomString(length int) string {
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
seededRand := rand.New(rand.NewSource(time.Now().UnixNano()))
b := make([]byte, length)
for i := range b {
b[i] = charset[seededRand.Intn(len(charset))]
}
return string(b)

Check warning on line 81 in jobsdb/cmd/bench/main.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/main.go#L74-L81

Added lines #L74 - L81 were not covered by tests
}

func main() {
flag.Parse()

// Create a new Docker pool
pool, err := dockertest.NewPool("")
if err != nil {
log.Fatalf("Could not connect to docker: %s", err)
}

Check warning on line 91 in jobsdb/cmd/bench/main.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/main.go#L84-L91

Added lines #L84 - L91 were not covered by tests

// Generate random password
randomPassword := generateRandomString(16)

// Find a free port
freePort, err := freeport.GetFreePort()
if err != nil {
log.Fatalf("Could not get free port: %s", err)
}

Check warning on line 100 in jobsdb/cmd/bench/main.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/main.go#L94-L100

Added lines #L94 - L100 were not covered by tests

// Create a temporary file for the init script
tmpfile, err := os.CreateTemp("", "init.sql")
if err != nil {
log.Fatalf("Could not create temp file: %s", err)
}
defer os.Remove(tmpfile.Name())

if _, err := tmpfile.WriteString(initSQL); err != nil {
log.Fatalf("Could not write to temp file: %s", err)
}
if err := tmpfile.Close(); err != nil {
log.Fatalf("Could not close temp file: %s", err)
}

Check warning on line 114 in jobsdb/cmd/bench/main.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/main.go#L103-L114

Added lines #L103 - L114 were not covered by tests

// Start a PostgreSQL container
// docker.io/rudderstack/postgres:bitnamiDoremonPocket15
resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "postgres",
Tag: postgresVersion, // Use the configurable version
Env: []string{
"POSTGRES_PASSWORD=" + randomPassword,
"POSTGRES_DB=testdb",
},
Mounts: []string{
fmt.Sprintf("%s:/docker-entrypoint-initdb.d/init.sql", tmpfile.Name()),
},
Cmd: []string{
"postgres",
"-c", "shared_preload_libraries=pg_stat_statements",
},
PortBindings: map[docker.Port][]docker.PortBinding{
"5432/tcp": {{HostIP: "0.0.0.0", HostPort: fmt.Sprintf("%d", freePort)}},
},
}, func(config *docker.HostConfig) {
config.AutoRemove = true
config.RestartPolicy = docker.RestartPolicy{
Name: "no",
}
})
if err != nil {
log.Fatalf("Could not start resource: %s", err)
}

Check warning on line 143 in jobsdb/cmd/bench/main.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/main.go#L118-L143

Added lines #L118 - L143 were not covered by tests

// Construct the database URL
dbURL := fmt.Sprintf("postgres://postgres:%s@localhost:%d/testdb?sslmode=disable", randomPassword, freePort)

// Print the database URL
fmt.Printf("Database URL: %s\n", dbURL)

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information High

Sensitive data returned by an access to randomPassword
flows to a logging call.

// Create a connection to the database
var db *sql.DB
err = pool.Retry(func() error {
var err error
db, err = sql.Open("postgres", dbURL)
if err != nil {
return err
}
return db.Ping()

Check warning on line 159 in jobsdb/cmd/bench/main.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/main.go#L146-L159

Added lines #L146 - L159 were not covered by tests
})
if err != nil {
log.Fatalf("Could not connect to docker: %s", err)
}

Check warning on line 163 in jobsdb/cmd/bench/main.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/main.go#L161-L163

Added lines #L161 - L163 were not covered by tests

fmt.Println("Successfully connected to database")

// Ensure the container is removed when we're done
defer func() {
if err := pool.Purge(resource); err != nil {
log.Printf("Could not purge resource: %s", err)
}

Check warning on line 171 in jobsdb/cmd/bench/main.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/main.go#L165-L171

Added lines #L165 - L171 were not covered by tests
}()

c := config.New()
c.Set("INSTANCE_ID", "1")

opts := []jobsdb.OptsFunc{
jobsdb.WithDBHandle(db), jobsdb.WithConfig(c), jobsdb.WithStats(stats.NOP),
}
if binaryPayload {
opts = append(opts, jobsdb.WithBinaryPayload(true))
}
if compression {
opts = append(opts, jobsdb.WithPayloadCompression(true))
}

Check warning on line 185 in jobsdb/cmd/bench/main.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/main.go#L174-L185

Added lines #L174 - L185 were not covered by tests

// Use the db connection for your jobsdb
jobsDB := jobsdb.NewForReadWrite("bench_db", opts...)
if err := jobsDB.Start(); err != nil {
panic(err)

Check warning on line 190 in jobsdb/cmd/bench/main.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/main.go#L188-L190

Added lines #L188 - L190 were not covered by tests
}

defer jobsDB.Close()

ctx, cancel := context.WithTimeout(context.Background(), duration)
defer cancel()

// Create a separate context for signal handling
sigCtx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer stop()

g, ctx := errgroup.WithContext(ctx)
start := time.Now()

// Start multiple store workers
for i := 0; i < writeWorkers; i++ {
g.Go(func() error {
return storeWorker(ctx, jobsDB)
})

Check warning on line 209 in jobsdb/cmd/bench/main.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/main.go#L193-L209

Added lines #L193 - L209 were not covered by tests
}

// Start the read worker
g.Go(func() error {
return readWorker(ctx, jobsDB)
})

Check warning on line 215 in jobsdb/cmd/bench/main.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/main.go#L213-L215

Added lines #L213 - L215 were not covered by tests

// Start the status update goroutine
g.Go(func() error {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
lastCount := int64(0)
lastTime := start

for {
select {
case <-ctx.Done():
return ctx.Err()
case t := <-ticker.C:
currentCount := atomic.LoadInt64(&eventCounter)
currentTime := t
duration := currentTime.Sub(lastTime)
rate := float64(currentCount-lastCount) / duration.Seconds()

fmt.Printf("[%s] Processed %d events. Current rate: %.2f events/second\n",
currentTime.Format("15:04:05"), currentCount, rate)

lastCount = currentCount
lastTime = currentTime

Check warning on line 238 in jobsdb/cmd/bench/main.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/main.go#L218-L238

Added lines #L218 - L238 were not covered by tests
}
}
})

// Wait for either the workers to finish or a signal
select {
case <-ctx.Done():
if err := g.Wait(); err != nil && err != context.DeadlineExceeded {
log.Printf("Error occurred: %v", err)
}
case <-sigCtx.Done():
log.Println("Received termination signal")
cancel() // Cancel the worker context
if err := g.Wait(); err != nil && err != context.Canceled {
log.Printf("Error occurred during shutdown: %v", err)
}

Check warning on line 254 in jobsdb/cmd/bench/main.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/main.go#L244-L254

Added lines #L244 - L254 were not covered by tests
}

elapsed := time.Since(start)
totalEvents := atomic.LoadInt64(&eventCounter)
fmt.Printf("\nFinal results:\n")
fmt.Printf("Processed %d events in %v\n", totalEvents, elapsed)
fmt.Printf("Average rate: %.2f events/second\n", float64(totalEvents)/elapsed.Seconds())
fmt.Printf("Database URL: %s\n", dbURL)

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information High

Sensitive data returned by an access to randomPassword
flows to a logging call.

// Wait for user confirmation before exiting
fmt.Println("\nPress Enter to quit...")
bufio.NewReader(os.Stdin).ReadString('\n')

Check failure on line 266 in jobsdb/cmd/bench/main.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `(*bufio.Reader).ReadString` is not checked (errcheck)

Check warning on line 266 in jobsdb/cmd/bench/main.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/main.go#L257-L266

Added lines #L257 - L266 were not covered by tests
}

func storeWorker(ctx context.Context, db jobsdb.JobsDB) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
batch := make([]*jobsdb.JobT, 0, batchSize)
for j := 0; j < batchSize; j++ {
job := &jobsdb.JobT{
UserID: fmt.Sprintf("user-%d", atomic.AddInt64(&eventCounter, 1)),
UUID: uuid.New(),
Parameters: []byte(fmt.Sprintf(`{"event_id": %d}`, atomic.LoadInt64(&eventCounter))),
CustomVal: "benchmark",
EventPayload: eventPayload,
}
batch = append(batch, job)
}

Check warning on line 285 in jobsdb/cmd/bench/main.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/main.go#L269-L285

Added lines #L269 - L285 were not covered by tests

if err := db.Store(ctx, batch); err != nil {
return fmt.Errorf("failed to store batch: %w", err)
}

Check warning on line 289 in jobsdb/cmd/bench/main.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/main.go#L287-L289

Added lines #L287 - L289 were not covered by tests
}
}
}

func readWorker(ctx context.Context, db jobsdb.JobsDB) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
jobs, err := db.GetUnprocessed(ctx, jobsdb.GetQueryParams{
JobsLimit: batchSize,
// ... existing query params ...
})
if err != nil {
return fmt.Errorf("failed to retrieve jobs: %w", err)
}

Check warning on line 306 in jobsdb/cmd/bench/main.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/main.go#L294-L306

Added lines #L294 - L306 were not covered by tests

if len(jobs.Jobs) == 0 {
time.Sleep(10 * time.Millisecond) // Avoid tight loop if no jobs
continue

Check warning on line 310 in jobsdb/cmd/bench/main.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/main.go#L308-L310

Added lines #L308 - L310 were not covered by tests
}

var statusList []*jobsdb.JobStatusT
for _, job := range jobs.Jobs {

statusList = append(statusList, &jobsdb.JobStatusT{
JobID: job.JobID,
JobState: jobsdb.Succeeded.State,
AttemptNum: 1,
ExecTime: time.Now(),
RetryTime: time.Now(),
ErrorCode: "200",
ErrorResponse: []byte(`{"success": true}`),
Parameters: []byte(`{"event_id": 1}`),
})
}

Check warning on line 326 in jobsdb/cmd/bench/main.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/main.go#L313-L326

Added lines #L313 - L326 were not covered by tests

if err := db.UpdateJobStatus(ctx, statusList, []string{}, []jobsdb.ParameterFilterT{}); err != nil {
return fmt.Errorf("failed to update job status: %w", err)
}

Check warning on line 330 in jobsdb/cmd/bench/main.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/main.go#L328-L330

Added lines #L328 - L330 were not covered by tests
}
}
}
Loading

0 comments on commit 85befe3

Please sign in to comment.