From f5b638fc595fb6cc2e06949e66bf554708c79a76 Mon Sep 17 00:00:00 2001 From: Larisa Ustalov Date: Mon, 20 Aug 2018 11:54:08 +0300 Subject: [PATCH] concurrency and results collection --- cmd/gemini/root.go | 87 ++++++++++++++++++++++++++++++++++++++-------- schema.go | 64 ++++++++++++++++++++-------------- session.go | 18 ++++++---- 3 files changed, 122 insertions(+), 47 deletions(-) diff --git a/cmd/gemini/root.go b/cmd/gemini/root.go index 1c713127..76df0b2a 100644 --- a/cmd/gemini/root.go +++ b/cmd/gemini/root.go @@ -4,20 +4,46 @@ package main import ( "fmt" + "math/rand" + "github.com/scylladb/gemini" "github.com/spf13/cobra" - "math/rand" ) var ( testClusterHost string oracleClusterHost string maxTests int + threads int + pkNumberPerThread int seed int dropSchema bool verbose bool ) +type Status struct { + WriteOps int + WriteErrors int + ReadOps int + ReadErrors int +} + +func collectResults(one Status, sum Status) Status { + sum.WriteOps += one.WriteOps + sum.WriteErrors += one.WriteErrors + sum.ReadOps += one.ReadOps + sum.ReadErrors += one.ReadErrors + return sum +} + +func printResults(r Status) { + fmt.Println("Results:") + fmt.Printf("\twrite ops: %v\n", r.WriteOps) + fmt.Printf("\twrite errors: %v\n", r.WriteErrors) + fmt.Printf("\tread ops: %v\n", r.ReadOps) + fmt.Printf("\tread errors: %v\n", r.ReadErrors) +} + func run(cmd *cobra.Command, args []string) { rand.Seed(int64(seed)) fmt.Printf("Seed: %d\n", seed) @@ -34,21 +60,21 @@ func run(cmd *cobra.Command, args []string) { schemaBuilder.Table(gemini.Table{ Name: "data", PartitionKeys: []gemini.ColumnDef{ - gemini.ColumnDef{ + { Name: "pk", Type: "int", }, }, ClusteringKeys: []gemini.ColumnDef{ - gemini.ColumnDef{ + { Name: "ck", Type: "int", }, }, Columns: []gemini.ColumnDef{ - gemini.ColumnDef{ + { Name: "n", - Type: "int", + Type: "blob", }, }, }) @@ -74,33 +100,62 @@ func run(cmd *cobra.Command, args []string) { } } - nrPassedTests := 0 + runJob(MixedJob, schema, session) +} + +func runJob(f func(gemini.Schema, *gemini.Session, gemini.PartitionRange, chan Status), schema gemini.Schema, s *gemini.Session) { + testRes := Status{} + c := make(chan Status) + minRange := 0 + maxRange := pkNumberPerThread + + for i := 0; i < threads; i++ { + p := gemini.PartitionRange{Min: minRange + i*maxRange, Max: maxRange + i*maxRange} + go f(schema, s, p, c) + } + + for i := 0; i < threads; i++ { + res := <-c + testRes = collectResults(res, testRes) + } + + printResults(testRes) +} + +func MixedJob(schema gemini.Schema, s *gemini.Session, p gemini.PartitionRange, c chan Status) { + testStatus := Status{} for i := 0; i < maxTests; i++ { - mutateStmt := schema.GenMutateStmt() + mutateStmt := schema.GenMutateStmt(&p) mutateQuery := mutateStmt.Query mutateValues := mutateStmt.Values() if verbose { fmt.Printf("%s (values=%v)\n", mutateQuery, mutateValues) } - if err := session.Mutate(mutateQuery, mutateValues...); err != nil { + testStatus.WriteOps++ + if err := s.Mutate(mutateQuery, mutateValues...); err != nil { fmt.Printf("Failed! Mutation '%s' (values=%v) caused an error: '%v'\n", mutateQuery, mutateValues, err) - return + testStatus.WriteErrors++ } - checkStmt := schema.GenCheckStmt() + checkStmt := schema.GenCheckStmt(&p) checkQuery := checkStmt.Query checkValues := checkStmt.Values() if verbose { fmt.Printf("%s (values=%v)\n", checkQuery, checkValues) } - if diff := session.Check(checkQuery, checkValues...); diff != "" { - fmt.Printf("Failed! Check '%s' (values=%v) rows differ (-oracle +test)\n%s", checkQuery, checkValues, diff) - return + err := s.Check(checkQuery, checkValues...) + if err == nil { + testStatus.ReadOps++ + } else { + if err != gemini.ErrReadNoDataReturned { + fmt.Printf("Failed! Check '%s' (values=%v)\n%s\n", checkQuery, checkValues, err) + testStatus.ReadErrors++ + } } - nrPassedTests++ } - fmt.Printf("OK, passed %d tests.\n", nrPassedTests) + + c <- testStatus } var rootCmd = &cobra.Command{ @@ -118,6 +173,8 @@ func init() { rootCmd.Flags().StringVarP(&oracleClusterHost, "oracle-cluster", "o", "", "Host name of the oracle cluster that provides correct answers") rootCmd.MarkFlagRequired("oracle-cluster") rootCmd.Flags().IntVarP(&maxTests, "max-tests", "m", 100, "Maximum number of test iterations to run") + rootCmd.Flags().IntVarP(&threads, "threads", "c", 10, "Number of threads to run concurrently") + rootCmd.Flags().IntVarP(&pkNumberPerThread, "max-pk-per-thread", "p", 50, "Maximum number of partition keys per thread") rootCmd.Flags().IntVarP(&seed, "seed", "s", 1, "PRNG seed value") rootCmd.Flags().BoolVarP(&dropSchema, "drop-schema", "d", false, "Drop schema before starting tests run") rootCmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose output during test run") diff --git a/schema.go b/schema.go index 70e513f7..c6ea9a3e 100644 --- a/schema.go +++ b/schema.go @@ -4,6 +4,8 @@ import ( "fmt" "math/rand" "strings" + + "github.com/google/uuid" ) type Keyspace struct { @@ -25,8 +27,8 @@ type Table struct { type Schema interface { GetDropSchema() []string GetCreateSchema() []string - GenMutateStmt() *Stmt - GenCheckStmt() *Stmt + GenMutateStmt(*PartitionRange) *Stmt + GenCheckStmt(*PartitionRange) *Stmt } type Stmt struct { @@ -39,6 +41,15 @@ type schema struct { table Table } +type PartitionRange struct { + Min int `default:0` + Max int `default:100` +} + +func randRange(min int, max int) int { + return rand.Intn(max-min) + min +} + func (s *schema) GetDropSchema() []string { return []string{ fmt.Sprintf("DROP KEYSPACE IF EXISTS %s", s.keyspace.Name), @@ -68,7 +79,7 @@ func (s *schema) GetCreateSchema() []string { } } -func (s *schema) GenMutateStmt() *Stmt { +func (s *schema) GenMutateStmt(p *PartitionRange) *Stmt { columns := []string{} values := []string{} for _, pk := range s.table.PartitionKeys { @@ -89,34 +100,35 @@ func (s *schema) GenMutateStmt() *Stmt { Values: func() []interface{} { values := make([]interface{}, 0) for _, _ = range s.table.PartitionKeys { - values = append(values, rand.Intn(100)) + values = append(values, randRange(p.Min, p.Max)) } for _, _ = range s.table.ClusteringKeys { - values = append(values, rand.Intn(100)) + values = append(values, randRange(p.Min, p.Max)) } for _, _ = range s.table.Columns { - values = append(values, rand.Intn(100)) + r, _ := uuid.NewRandom() + values = append(values, r.String()) } return values }, } } -func (s *schema) GenCheckStmt() *Stmt { +func (s *schema) GenCheckStmt(p *PartitionRange) *Stmt { switch n := rand.Intn(4); n { case 0: - return s.genSinglePartitionQuery() + return s.genSinglePartitionQuery(p) case 1: - return s.genMultiplePartitionQuery() + return s.genMultiplePartitionQuery(p) case 2: - return s.genClusteringRangeQuery() + return s.genClusteringRangeQuery(p) case 3: - return s.genClusteringRangeQueryComplex() + return s.genClusteringRangeQueryComplex(p) } return nil } -func (s *schema) genSinglePartitionQuery() *Stmt { +func (s *schema) genSinglePartitionQuery(p *PartitionRange) *Stmt { relations := []string{} for _, pk := range s.table.PartitionKeys { relations = append(relations, fmt.Sprintf("%s = ?", pk.Name)) @@ -125,7 +137,7 @@ func (s *schema) genSinglePartitionQuery() *Stmt { values := func() []interface{} { values := make([]interface{}, 0) for _, _ = range s.table.PartitionKeys { - values = append(values, rand.Intn(100)) + values = append(values, randRange(p.Min, p.Max)) } return values } @@ -135,7 +147,7 @@ func (s *schema) genSinglePartitionQuery() *Stmt { } } -func (s *schema) genMultiplePartitionQuery() *Stmt { +func (s *schema) genMultiplePartitionQuery(p *PartitionRange) *Stmt { relations := []string{} for _, pk := range s.table.PartitionKeys { relations = append(relations, fmt.Sprintf("%s IN (?)", pk.Name)) @@ -146,7 +158,7 @@ func (s *schema) genMultiplePartitionQuery() *Stmt { for _, _ = range s.table.PartitionKeys { keys := []int{} for i := 0; i < rand.Intn(10); i++ { - keys = append(keys, rand.Intn(100)) + keys = append(keys, randRange(p.Min, p.Max)) } values = append(values, keys) } @@ -158,7 +170,7 @@ func (s *schema) genMultiplePartitionQuery() *Stmt { } } -func (s *schema) genClusteringRangeQuery() *Stmt { +func (s *schema) genClusteringRangeQuery(p *PartitionRange) *Stmt { relations := []string{} for _, pk := range s.table.PartitionKeys { relations = append(relations, fmt.Sprintf("%s = ?", pk.Name)) @@ -170,11 +182,11 @@ func (s *schema) genClusteringRangeQuery() *Stmt { values := func() []interface{} { values := make([]interface{}, 0) for _, _ = range s.table.PartitionKeys { - values = append(values, rand.Intn(100)) + values = append(values, randRange(p.Min, p.Max)) } for _, _ = range s.table.ClusteringKeys { - start := rand.Intn(100) - end := start + rand.Intn(100) + start := randRange(p.Min, p.Max) + end := start + randRange(p.Min, p.Max) values = append(values, start) values = append(values, end) } @@ -186,27 +198,27 @@ func (s *schema) genClusteringRangeQuery() *Stmt { } } -func (s *schema) genClusteringRangeQueryComplex() *Stmt { +func (s *schema) genClusteringRangeQueryComplex(p *PartitionRange) *Stmt { relations := []string{} for _, pk := range s.table.PartitionKeys { relations = append(relations, fmt.Sprintf("%s = ?", pk.Name)) } for _, ck := range s.table.ClusteringKeys { - relations = append(relations, fmt.Sprintf("%s > ? AND %s < ? AND %s > ? and %s < ?", ck.Name, ck.Name, ck.Name, ck.Name)) + relations = append(relations, fmt.Sprintf("%s > ? AND %s < ? OR %s > ? AND %s < ?", ck.Name, ck.Name, ck.Name, ck.Name)) } query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s", s.keyspace.Name, s.table.Name, strings.Join(relations, " AND ")) values := func() []interface{} { values := make([]interface{}, 0) for _, _ = range s.table.PartitionKeys { - values = append(values, rand.Intn(100)) + values = append(values, randRange(p.Min, p.Max)) } for _, _ = range s.table.ClusteringKeys { - start := rand.Intn(100) - end := start + rand.Intn(100) + start := randRange(p.Min, p.Max) + end := start + randRange(p.Min, p.Max) values = append(values, start) values = append(values, end) - start = rand.Intn(100) - end = start + rand.Intn(100) + start = randRange(p.Min, p.Max) + end = start + randRange(p.Min, p.Max) values = append(values, start) values = append(values, end) } diff --git a/session.go b/session.go index c2bd06ca..0c4c7fe6 100644 --- a/session.go +++ b/session.go @@ -1,12 +1,12 @@ package gemini import ( + "errors" + "fmt" "time" "github.com/gocql/gocql" "github.com/google/go-cmp/cmp" - - "fmt" ) type Session struct { @@ -14,6 +14,10 @@ type Session struct { oracleSession *gocql.Session } +var ( + ErrReadNoDataReturned = errors.New("read: no data returned") +) + func NewSession(testClusterHost string, oracleClusterHost string) *Session { testCluster := gocql.NewCluster(testClusterHost) testCluster.Timeout = 5 * time.Second @@ -50,7 +54,7 @@ func (s *Session) Mutate(query string, values ...interface{}) error { return nil } -func (s *Session) Check(query string, values ...interface{}) string { +func (s *Session) Check(query string, values ...interface{}) error { testIter := s.testSession.Query(query, values...).Iter() oracleIter := s.oracleSession.Query(query, values...).Iter() for { @@ -62,9 +66,11 @@ func (s *Session) Check(query string, values ...interface{}) string { if !oracleIter.MapScan(oracleRow) { break } - if diff := cmp.Diff(oracleRow, testRow); diff != "" { - return diff + diff := cmp.Diff(oracleRow, testRow) + if diff != "" { + return fmt.Errorf("rows differ (-%v +%v): %v", oracleRow, testRow, diff) } + return nil } - return "" + return ErrReadNoDataReturned }