From 03d0755e011592e884bf0242eac8d8a077cd5342 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Mon, 22 Jul 2024 21:38:36 +0300 Subject: [PATCH 01/17] fix: change sequence for numbers Signed-off-by: Vladislav Sukhin --- pkg/repository/result/mongo_numbers.go | 148 +++++++++++++------------ 1 file changed, 75 insertions(+), 73 deletions(-) diff --git a/pkg/repository/result/mongo_numbers.go b/pkg/repository/result/mongo_numbers.go index 170ebbc04c1..50aff29cadf 100644 --- a/pkg/repository/result/mongo_numbers.go +++ b/pkg/repository/result/mongo_numbers.go @@ -2,126 +2,128 @@ package result import ( "context" - "strings" + "fmt" "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" - - "github.com/kubeshop/testkube/pkg/api/v1/testkube" ) type oldExecutionNumber struct { - TestName string `json:"testName"` - Number int `json:"number"` -} - -type executionNumber struct { Name string `json:"name"` Number int `json:"number"` IsTestSuite bool `json:"isTestSuite"` } -func (r *MongoRepository) GetNextExecutionNumber(ctx context.Context, name string) (number int32, err error) { - err = r.convertFromOldToNew() +type executionNumber struct { + Id string `bson:"_id"` + Number int `bson:"number"` + ExecutionType string `bson:"executionType"` +} + +func (r *MongoRepository) GetNextExecutionNumber(ctx context.Context, name string, executionType string) (number int32, err error) { + oldName := getOldName(name, executionType) + number, err = r.getOldNumber(ctx, oldName) if err != nil { - return 1, err + return 0, err } - // TODO: modify this when we decide to update the interfaces for OSS and cloud - isTestSuite := strings.HasPrefix(name, "ts-") - - execNmbr := executionNumber{Name: name, IsTestSuite: isTestSuite} - retry := false - retryAttempts := 0 - maxRetries := 10 - + id := getMongoId(name, executionType) + executionNumber := executionNumber{ + Id: id, + Number: int(number), + ExecutionType: executionType, + } opts := options.FindOneAndUpdate() opts.SetUpsert(true) - opts.SetReturnDocument(options.After) - err = r.SequencesColl.FindOne(ctx, bson.M{"name": name}).Decode(&execNmbr) - if err != nil { - var execution testkube.Execution - number, _ = r.GetLatestTestNumber(ctx, name) - if number == 0 { - execNmbr.Number = 1 - } else { - execNmbr.Number = int(execution.Number) + 1 + err = r.SequencesColl.FindOneAndUpdate(ctx, bson.M{"_id": id}, bson.M{"$set": executionNumber}, opts).Err() + if err != nil && !mongo.IsDuplicateKeyError(err) { + return 0, err + } + + if number != 0 { + _, err = r.SequencesColl.DeleteOne(ctx, bson.M{"name": oldName}) + if err != nil { + return 0, err } - _, err = r.SequencesColl.InsertOne(ctx, execNmbr) - } else { - err = r.SequencesColl.FindOneAndUpdate(ctx, bson.M{"name": name}, bson.M{"$inc": bson.M{"number": 1}}, opts).Decode(&execNmbr) } - retry = err != nil + opts.SetUpsert(false) + opts.SetReturnDocument(options.After) - for retry { - retryAttempts++ - err = r.SequencesColl.FindOneAndUpdate(ctx, bson.M{"name": name}, bson.M{"$inc": bson.M{"number": 1}}, opts).Decode(&execNmbr) - if err == nil || retryAttempts >= maxRetries { - retry = false - } + err = r.SequencesColl.FindOneAndUpdate(ctx, bson.M{"_id": id}, bson.M{"$inc": bson.M{"number": 1}}, opts).Decode(&executionNumber) + if err == nil { + return 0, err } - return int32(execNmbr.Number), nil + return int32(executionNumber.Number), nil } -func (r *MongoRepository) DeleteExecutionNumber(ctx context.Context, name string) (err error) { - err = r.convertFromOldToNew() +func (r *MongoRepository) DeleteExecutionNumber(ctx context.Context, name string, executionType string) (err error) { + _, err = r.SequencesColl.DeleteOne(ctx, bson.M{"name": getOldName(name, executionType)}) if err != nil { return err } - _, err = r.SequencesColl.DeleteOne(ctx, bson.M{"name": name}) + + _, err = r.SequencesColl.DeleteOne(ctx, bson.M{"_id": getMongoId(name, executionType)}) return err } -func (r *MongoRepository) DeleteExecutionNumbers(ctx context.Context, names []string) (err error) { - err = r.convertFromOldToNew() +func (r *MongoRepository) DeleteExecutionNumbers(ctx context.Context, names []string, executionType string) (err error) { + ids := make([]string, len(names)) + for i := range names { + ids[i] = getOldName(names[i], executionType) + } + + _, err = r.SequencesColl.DeleteMany(ctx, bson.M{"name": bson.M{"$in": ids}}) if err != nil { return err } - _, err = r.SequencesColl.DeleteMany(ctx, bson.M{"name": bson.M{"$in": names}}) + + for i := range names { + ids[i] = getMongoId(names[i], executionType) + } + + _, err = r.SequencesColl.DeleteMany(ctx, bson.M{"_id": bson.M{"$in": ids}}) return err } -func (r *MongoRepository) DeleteAllExecutionNumbers(ctx context.Context, isTestSuite bool) (err error) { - err = r.convertFromOldToNew() +func (r *MongoRepository) DeleteAllExecutionNumbers(ctx context.Context, executionType string) (err error) { + isTestSuite := false + if executionType == "testsuite" { + isTestSuite = true + } + + _, err = r.SequencesColl.DeleteMany(ctx, bson.M{"istestsuite": isTestSuite}) if err != nil { return err } - _, err = r.SequencesColl.DeleteMany(ctx, bson.M{"istestsuite": isTestSuite}) + + _, err = r.SequencesColl.DeleteMany(ctx, bson.M{"executionType": executionType}) return err } -func (r *MongoRepository) convertFromOldToNew() error { - filter := bson.M{"testname": bson.M{"$exists": true}} +func (r *MongoRepository) getOldNumber(ctx context.Context, name string) (int32, error) { + var executionNumber oldExecutionNumber - cursor, err := r.SequencesColl.Find(context.Background(), filter) - if err != nil { - return err + err := r.SequencesColl.FindOne(ctx, bson.M{"name": name}).Decode(&executionNumber) + if err != nil && err != mongo.ErrNoDocuments { + return 0, err } - defer cursor.Close(context.Background()) - for cursor.Next(context.Background()) { - var entry oldExecutionNumber - err := cursor.Decode(&entry) - if err != nil { - return err - } - - isTestSuite := strings.HasPrefix(entry.TestName, "ts-") + return int32(executionNumber.Number), nil +} - newEntry := executionNumber{ - Name: entry.TestName, - Number: entry.Number, - IsTestSuite: isTestSuite, - } +func getMongoId(name string, executionType string) string { + return fmt.Sprintf("%s-%s", name, executionType) +} - _, err = r.SequencesColl.InsertOne(context.Background(), newEntry) - if err != nil { - return err - } +func getOldName(name string, executionType string) string { + oldPrefix := "" + if executionType == "testsuite" { + oldPrefix = "ts-" } - _, err = r.SequencesColl.DeleteMany(context.Background(), filter) - return err + + return fmt.Sprintf("%s%s", oldPrefix, name) } From 213e3be612692a28cb80b2a0acc8a75587345b56 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Mon, 22 Jul 2024 21:45:46 +0300 Subject: [PATCH 02/17] fix: add line Signed-off-by: Vladislav Sukhin --- pkg/repository/result/mongo_numbers.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/repository/result/mongo_numbers.go b/pkg/repository/result/mongo_numbers.go index 50aff29cadf..732390931c9 100644 --- a/pkg/repository/result/mongo_numbers.go +++ b/pkg/repository/result/mongo_numbers.go @@ -34,6 +34,7 @@ func (r *MongoRepository) GetNextExecutionNumber(ctx context.Context, name strin Number: int(number), ExecutionType: executionType, } + opts := options.FindOneAndUpdate() opts.SetUpsert(true) From 7bbc06385430ac20eb442c8a9c2d9ce8f43ffbdb Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Tue, 23 Jul 2024 15:55:38 +0300 Subject: [PATCH 03/17] fix: add sequence repository Signed-off-by: Vladislav Sukhin --- pkg/repository/result/interface.go | 4 +- pkg/repository/result/mongo.go | 89 +++++------ pkg/repository/result/mongo_numbers.go | 130 ---------------- pkg/repository/sequence/interface.go | 17 ++ pkg/repository/sequence/mock_repository.go | 92 +++++++++++ pkg/repository/sequence/mongo.go | 172 +++++++++++++++++++++ 6 files changed, 328 insertions(+), 176 deletions(-) delete mode 100644 pkg/repository/result/mongo_numbers.go create mode 100644 pkg/repository/sequence/interface.go create mode 100644 pkg/repository/sequence/mock_repository.go create mode 100644 pkg/repository/sequence/mongo.go diff --git a/pkg/repository/result/interface.go b/pkg/repository/result/interface.go index 11c8d157b57..b23c6a09ea7 100644 --- a/pkg/repository/result/interface.go +++ b/pkg/repository/result/interface.go @@ -80,8 +80,8 @@ type Repository interface { } type Sequences interface { - // GetNextExecutionNumber gets next execution number by test name - GetNextExecutionNumber(ctx context.Context, testName string) (number int32, err error) + // GetNextExecutionNumber gets next execution number by name + GetNextExecutionNumber(ctx context.Context, name string) (number int32, err error) } //go:generate mockgen -destination=./mock_output_repository.go -package=result "github.com/kubeshop/testkube/pkg/repository/result" OutputRepository diff --git a/pkg/repository/result/mongo.go b/pkg/repository/result/mongo.go index 0f93a91fef3..1bb2f4187e8 100644 --- a/pkg/repository/result/mongo.go +++ b/pkg/repository/result/mongo.go @@ -2,6 +2,7 @@ package result import ( "context" + "errors" "fmt" "strings" "time" @@ -18,14 +19,14 @@ import ( "github.com/kubeshop/testkube/pkg/featureflags" "github.com/kubeshop/testkube/pkg/log" logsclient "github.com/kubeshop/testkube/pkg/logs/client" + "github.com/kubeshop/testkube/pkg/repository/sequence" "github.com/kubeshop/testkube/pkg/storage" ) var _ Repository = (*MongoRepository)(nil) const ( - CollectionResults = "results" - CollectionSequences = "sequences" + CollectionResults = "results" // OutputPrefixSize is the size of the beginning of execution output in case this doesn't fit into Mongo OutputPrefixSize = 1 * 1024 * 1024 // OutputMaxSize is the size of the execution output in case this doesn't fit into the 16 MB limited by Mongo @@ -41,7 +42,6 @@ func NewMongoRepository(db *mongo.Database, allowDiskUse, isDocDb bool, opts ... r := &MongoRepository{ db: db, ResultsColl: db.Collection(CollectionResults), - SequencesColl: db.Collection(CollectionSequences), OutputRepository: NewMongoOutputRepository(db), allowDiskUse: allowDiskUse, isDocDb: isDocDb, @@ -64,7 +64,6 @@ func NewMongoRepositoryWithOutputRepository( r := &MongoRepository{ db: db, ResultsColl: db.Collection(CollectionResults), - SequencesColl: db.Collection(CollectionSequences), OutputRepository: outputRepository, allowDiskUse: allowDiskUse, log: log.DefaultLogger, @@ -77,28 +76,29 @@ func NewMongoRepositoryWithOutputRepository( return r } -func NewMongoRepositoryWithMinioOutputStorage(db *mongo.Database, allowDiskUse bool, storageClient storage.Client, bucket string) *MongoRepository { +func NewMongoRepositoryWithMinioOutputStorage(db *mongo.Database, allowDiskUse bool, + storageClient storage.Client, bucket string, sequenceRepository sequence.Repository) *MongoRepository { repo := MongoRepository{ - db: db, - ResultsColl: db.Collection(CollectionResults), - SequencesColl: db.Collection(CollectionSequences), - allowDiskUse: allowDiskUse, - log: log.DefaultLogger, + db: db, + ResultsColl: db.Collection(CollectionResults), + allowDiskUse: allowDiskUse, + log: log.DefaultLogger, + sequenceRepository: sequenceRepository, } repo.OutputRepository = NewMinioOutputRepository(storageClient, repo.ResultsColl, bucket) return &repo } type MongoRepository struct { - db *mongo.Database - ResultsColl *mongo.Collection - SequencesColl *mongo.Collection - OutputRepository OutputRepository - logGrpcClient logsclient.StreamGetter - allowDiskUse bool - isDocDb bool - features featureflags.FeatureFlags - log *zap.SugaredLogger + db *mongo.Database + ResultsColl *mongo.Collection + OutputRepository OutputRepository + logGrpcClient logsclient.StreamGetter + allowDiskUse bool + isDocDb bool + features featureflags.FeatureFlags + log *zap.SugaredLogger + sequenceRepository sequence.Repository } type MongoRepositoryOpt func(*MongoRepository) @@ -121,9 +121,9 @@ func WithMongoRepositoryResultCollection(collection *mongo.Collection) MongoRepo } } -func WithMongoRepositorySequenceCollection(collection *mongo.Collection) MongoRepositoryOpt { +func WithMongoRepositorySequenceCollection(sequenceRepository sequence.Repository) MongoRepositoryOpt { return func(r *MongoRepository) { - r.SequencesColl = collection + r.sequenceRepository = sequenceRepository } } @@ -695,9 +695,11 @@ func (r *MongoRepository) DeleteByTest(ctx context.Context, testName string) (er if err != nil { return } - err = r.DeleteExecutionNumber(ctx, testName) - if err != nil { - return + if r.sequenceRepository != nil { + err = r.sequenceRepository.DeleteExecutionNumber(ctx, testName, sequence.ExecutionTypeTest) + if err != nil { + return + } } _, err = r.ResultsColl.DeleteMany(ctx, bson.M{"testname": testName}) return @@ -709,10 +711,6 @@ func (r *MongoRepository) DeleteByTestSuite(ctx context.Context, testSuiteName s if err != nil { return } - err = r.DeleteExecutionNumber(ctx, testSuiteName) - if err != nil { - return - } _, err = r.ResultsColl.DeleteMany(ctx, bson.M{"testsuitename": testSuiteName}) return } @@ -723,9 +721,11 @@ func (r *MongoRepository) DeleteAll(ctx context.Context) (err error) { if err != nil { return } - err = r.DeleteAllExecutionNumbers(ctx, false) - if err != nil { - return + if r.sequenceRepository != nil { + err = r.sequenceRepository.DeleteAllExecutionNumbers(ctx, string(sequence.ExecutionTypeTest)) + if err != nil { + return + } } _, err = r.ResultsColl.DeleteMany(ctx, bson.M{}) return @@ -754,9 +754,11 @@ func (r *MongoRepository) DeleteByTests(ctx context.Context, testNames []string) return } - err = r.DeleteExecutionNumbers(ctx, testNames) - if err != nil { - return + if r.sequenceRepository != nil { + err = r.sequenceRepository.DeleteExecutionNumbers(ctx, testNames, sequence.ExecutionTypeTest) + if err != nil { + return + } } _, err = r.ResultsColl.DeleteMany(ctx, filter) return @@ -785,11 +787,6 @@ func (r *MongoRepository) DeleteByTestSuites(ctx context.Context, testSuiteNames return } - err = r.DeleteExecutionNumbers(ctx, testSuiteNames) - if err != nil { - return - } - _, err = r.ResultsColl.DeleteMany(ctx, filter) return } @@ -801,11 +798,6 @@ func (r *MongoRepository) DeleteForAllTestSuites(ctx context.Context) (err error return } - err = r.DeleteAllExecutionNumbers(ctx, true) - if err != nil { - return - } - _, err = r.ResultsColl.DeleteMany(ctx, bson.M{"testsuitename": bson.M{"$ne": ""}}) return } @@ -904,3 +896,12 @@ func (r *MongoRepository) GetPreviousFinishedState(ctx context.Context, testName return *result.ExecutionResult.Status, nil } + +// GetNextExecutionNumber gets next execution number by name +func (r *MongoRepository) GetNextExecutionNumber(ctx context.Context, name string) (number int32, err error) { + if r.sequenceRepository != nil { + return 0, errors.New("no sequence repository provided") + } + + return r.sequenceRepository.GetNextExecutionNumber(ctx, name, sequence.ExecutionTypeTest) +} diff --git a/pkg/repository/result/mongo_numbers.go b/pkg/repository/result/mongo_numbers.go deleted file mode 100644 index 732390931c9..00000000000 --- a/pkg/repository/result/mongo_numbers.go +++ /dev/null @@ -1,130 +0,0 @@ -package result - -import ( - "context" - "fmt" - - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" -) - -type oldExecutionNumber struct { - Name string `json:"name"` - Number int `json:"number"` - IsTestSuite bool `json:"isTestSuite"` -} - -type executionNumber struct { - Id string `bson:"_id"` - Number int `bson:"number"` - ExecutionType string `bson:"executionType"` -} - -func (r *MongoRepository) GetNextExecutionNumber(ctx context.Context, name string, executionType string) (number int32, err error) { - oldName := getOldName(name, executionType) - number, err = r.getOldNumber(ctx, oldName) - if err != nil { - return 0, err - } - - id := getMongoId(name, executionType) - executionNumber := executionNumber{ - Id: id, - Number: int(number), - ExecutionType: executionType, - } - - opts := options.FindOneAndUpdate() - opts.SetUpsert(true) - - err = r.SequencesColl.FindOneAndUpdate(ctx, bson.M{"_id": id}, bson.M{"$set": executionNumber}, opts).Err() - if err != nil && !mongo.IsDuplicateKeyError(err) { - return 0, err - } - - if number != 0 { - _, err = r.SequencesColl.DeleteOne(ctx, bson.M{"name": oldName}) - if err != nil { - return 0, err - } - } - - opts.SetUpsert(false) - opts.SetReturnDocument(options.After) - - err = r.SequencesColl.FindOneAndUpdate(ctx, bson.M{"_id": id}, bson.M{"$inc": bson.M{"number": 1}}, opts).Decode(&executionNumber) - if err == nil { - return 0, err - } - - return int32(executionNumber.Number), nil -} - -func (r *MongoRepository) DeleteExecutionNumber(ctx context.Context, name string, executionType string) (err error) { - _, err = r.SequencesColl.DeleteOne(ctx, bson.M{"name": getOldName(name, executionType)}) - if err != nil { - return err - } - - _, err = r.SequencesColl.DeleteOne(ctx, bson.M{"_id": getMongoId(name, executionType)}) - return err -} - -func (r *MongoRepository) DeleteExecutionNumbers(ctx context.Context, names []string, executionType string) (err error) { - ids := make([]string, len(names)) - for i := range names { - ids[i] = getOldName(names[i], executionType) - } - - _, err = r.SequencesColl.DeleteMany(ctx, bson.M{"name": bson.M{"$in": ids}}) - if err != nil { - return err - } - - for i := range names { - ids[i] = getMongoId(names[i], executionType) - } - - _, err = r.SequencesColl.DeleteMany(ctx, bson.M{"_id": bson.M{"$in": ids}}) - return err -} - -func (r *MongoRepository) DeleteAllExecutionNumbers(ctx context.Context, executionType string) (err error) { - isTestSuite := false - if executionType == "testsuite" { - isTestSuite = true - } - - _, err = r.SequencesColl.DeleteMany(ctx, bson.M{"istestsuite": isTestSuite}) - if err != nil { - return err - } - - _, err = r.SequencesColl.DeleteMany(ctx, bson.M{"executionType": executionType}) - return err -} - -func (r *MongoRepository) getOldNumber(ctx context.Context, name string) (int32, error) { - var executionNumber oldExecutionNumber - - err := r.SequencesColl.FindOne(ctx, bson.M{"name": name}).Decode(&executionNumber) - if err != nil && err != mongo.ErrNoDocuments { - return 0, err - } - - return int32(executionNumber.Number), nil -} - -func getMongoId(name string, executionType string) string { - return fmt.Sprintf("%s-%s", name, executionType) -} - -func getOldName(name string, executionType string) string { - oldPrefix := "" - if executionType == "testsuite" { - oldPrefix = "ts-" - } - - return fmt.Sprintf("%s%s", oldPrefix, name) -} diff --git a/pkg/repository/sequence/interface.go b/pkg/repository/sequence/interface.go new file mode 100644 index 00000000000..c0080959735 --- /dev/null +++ b/pkg/repository/sequence/interface.go @@ -0,0 +1,17 @@ +package sequence + +import ( + "context" +) + +//go:generate mockgen -destination=./mock_repository.go -package=sequence "github.com/kubeshop/testkube/pkg/repository/sequence" Repository +type Repository interface { + // GetNextExecutionNumber gets next execution number by name and type + GetNextExecutionNumber(ctx context.Context, name string, executionType ExecutionType) (number int32, err error) + // DeleteExecutionNumber deletes execution number by name and type + DeleteExecutionNumber(ctx context.Context, name string, executionType ExecutionType) (err error) + // DeleteExecutionNumbers deletes multiple execution numbers by names and type + DeleteExecutionNumbers(ctx context.Context, names []string, executionType ExecutionType) (err error) + // DeleteAllExecutionNumbers deletes all execution numbers by type + DeleteAllExecutionNumbers(ctx context.Context, executionType string) (err error) +} diff --git a/pkg/repository/sequence/mock_repository.go b/pkg/repository/sequence/mock_repository.go new file mode 100644 index 00000000000..218413e4788 --- /dev/null +++ b/pkg/repository/sequence/mock_repository.go @@ -0,0 +1,92 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/kubeshop/testkube/pkg/repository/sequence (interfaces: Repository) + +// Package sequence is a generated GoMock package. +package sequence + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockRepository is a mock of Repository interface. +type MockRepository struct { + ctrl *gomock.Controller + recorder *MockRepositoryMockRecorder +} + +// MockRepositoryMockRecorder is the mock recorder for MockRepository. +type MockRepositoryMockRecorder struct { + mock *MockRepository +} + +// NewMockRepository creates a new mock instance. +func NewMockRepository(ctrl *gomock.Controller) *MockRepository { + mock := &MockRepository{ctrl: ctrl} + mock.recorder = &MockRepositoryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockRepository) EXPECT() *MockRepositoryMockRecorder { + return m.recorder +} + +// DeleteAllExecutionNumbers mocks base method. +func (m *MockRepository) DeleteAllExecutionNumbers(arg0 context.Context, arg1 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteAllExecutionNumbers", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteAllExecutionNumbers indicates an expected call of DeleteAllExecutionNumbers. +func (mr *MockRepositoryMockRecorder) DeleteAllExecutionNumbers(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteAllExecutionNumbers", reflect.TypeOf((*MockRepository)(nil).DeleteAllExecutionNumbers), arg0, arg1) +} + +// DeleteExecutionNumber mocks base method. +func (m *MockRepository) DeleteExecutionNumber(arg0 context.Context, arg1 string, arg2 ExecutionType) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteExecutionNumber", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteExecutionNumber indicates an expected call of DeleteExecutionNumber. +func (mr *MockRepositoryMockRecorder) DeleteExecutionNumber(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteExecutionNumber", reflect.TypeOf((*MockRepository)(nil).DeleteExecutionNumber), arg0, arg1, arg2) +} + +// DeleteExecutionNumbers mocks base method. +func (m *MockRepository) DeleteExecutionNumbers(arg0 context.Context, arg1 []string, arg2 ExecutionType) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteExecutionNumbers", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteExecutionNumbers indicates an expected call of DeleteExecutionNumbers. +func (mr *MockRepositoryMockRecorder) DeleteExecutionNumbers(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteExecutionNumbers", reflect.TypeOf((*MockRepository)(nil).DeleteExecutionNumbers), arg0, arg1, arg2) +} + +// GetNextExecutionNumber mocks base method. +func (m *MockRepository) GetNextExecutionNumber(arg0 context.Context, arg1 string, arg2 ExecutionType) (int32, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetNextExecutionNumber", arg0, arg1, arg2) + ret0, _ := ret[0].(int32) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetNextExecutionNumber indicates an expected call of GetNextExecutionNumber. +func (mr *MockRepositoryMockRecorder) GetNextExecutionNumber(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNextExecutionNumber", reflect.TypeOf((*MockRepository)(nil).GetNextExecutionNumber), arg0, arg1, arg2) +} diff --git a/pkg/repository/sequence/mongo.go b/pkg/repository/sequence/mongo.go new file mode 100644 index 00000000000..01b62b60a12 --- /dev/null +++ b/pkg/repository/sequence/mongo.go @@ -0,0 +1,172 @@ +package sequence + +import ( + "context" + "fmt" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +var _ Repository = (*MongoRepository)(nil) + +const ( + CollectionSequences = "sequences" +) + +type ExecutionType string + +const ( + ExecutionTypeTest ExecutionType = "t" + ExecutionTypeTestSuite ExecutionType = "ts" + ExecutionTypeTestWorkflow ExecutionType = "tw" +) + +func NewMongoRepository(db *mongo.Database, opts ...Opt) *MongoRepository { + r := &MongoRepository{ + Coll: db.Collection(CollectionSequences), + } + + for _, opt := range opts { + opt(r) + } + + return r +} + +type Opt func(*MongoRepository) + +func WithMongoRepositoryCollection(collection *mongo.Collection) Opt { + return func(r *MongoRepository) { + r.Coll = collection + } +} + +type MongoRepository struct { + Coll *mongo.Collection +} + +type oldExecutionNumber struct { + Name string `json:"name"` + Number int `json:"number"` + IsTestSuite bool `json:"isTestSuite"` +} + +type executionNumber struct { + Id string `bson:"_id"` + Number int `bson:"number"` + ExecutionType ExecutionType `bson:"executionType"` +} + +// GetNextExecutionNumber gets next execution number by name and type +func (r *MongoRepository) GetNextExecutionNumber(ctx context.Context, name string, executionType ExecutionType) (number int32, err error) { + oldName := getOldName(name, executionType) + number, err = r.getOldNumber(ctx, oldName) + if err != nil { + return 0, err + } + + id := getMongoId(name, executionType) + executionNumber := executionNumber{ + Id: id, + Number: int(number), + ExecutionType: executionType, + } + + opts := options.FindOneAndUpdate() + opts.SetUpsert(true) + + err = r.Coll.FindOneAndUpdate(ctx, bson.M{"_id": id}, bson.M{"$set": executionNumber}, opts).Err() + if err != nil && !mongo.IsDuplicateKeyError(err) { + return 0, err + } + + if number != 0 { + _, err = r.Coll.DeleteOne(ctx, bson.M{"name": oldName}) + if err != nil { + return 0, err + } + } + + opts.SetUpsert(false) + opts.SetReturnDocument(options.After) + + err = r.Coll.FindOneAndUpdate(ctx, bson.M{"_id": id}, bson.M{"$inc": bson.M{"number": 1}}, opts).Decode(&executionNumber) + if err == nil { + return 0, err + } + + return int32(executionNumber.Number), nil +} + +// DeleteExecutionNumber deletes execution number by name and type +func (r *MongoRepository) DeleteExecutionNumber(ctx context.Context, name string, executionType ExecutionType) (err error) { + _, err = r.Coll.DeleteOne(ctx, bson.M{"name": getOldName(name, executionType)}) + if err != nil { + return err + } + + _, err = r.Coll.DeleteOne(ctx, bson.M{"_id": getMongoId(name, executionType)}) + return err +} + +// DeleteExecutionNumbers deletes multiple execution numbers by names and type +func (r *MongoRepository) DeleteExecutionNumbers(ctx context.Context, names []string, executionType ExecutionType) (err error) { + ids := make([]string, len(names)) + for i := range names { + ids[i] = getOldName(names[i], executionType) + } + + _, err = r.Coll.DeleteMany(ctx, bson.M{"name": bson.M{"$in": ids}}) + if err != nil { + return err + } + + for i := range names { + ids[i] = getMongoId(names[i], executionType) + } + + _, err = r.Coll.DeleteMany(ctx, bson.M{"_id": bson.M{"$in": ids}}) + return err +} + +// DeleteAllExecutionNumbers deletes all execution numbers by type +func (r *MongoRepository) DeleteAllExecutionNumbers(ctx context.Context, executionType string) (err error) { + isTestSuite := false + if executionType == "testsuite" { + isTestSuite = true + } + + _, err = r.Coll.DeleteMany(ctx, bson.M{"istestsuite": isTestSuite}) + if err != nil { + return err + } + + _, err = r.Coll.DeleteMany(ctx, bson.M{"executionType": executionType}) + return err +} + +func (r *MongoRepository) getOldNumber(ctx context.Context, name string) (int32, error) { + var executionNumber oldExecutionNumber + + err := r.Coll.FindOne(ctx, bson.M{"name": name}).Decode(&executionNumber) + if err != nil && err != mongo.ErrNoDocuments { + return 0, err + } + + return int32(executionNumber.Number), nil +} + +func getMongoId(name string, executionType ExecutionType) string { + return fmt.Sprintf("%s-%s", name, executionType) +} + +func getOldName(name string, executionType ExecutionType) string { + oldPrefix := "" + if executionType == "testsuite" { + oldPrefix = "ts-" + } + + return fmt.Sprintf("%s%s", oldPrefix, name) +} From db55cc7ed4c4017e8284ccb590c6716110f73a7d Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Tue, 23 Jul 2024 16:09:58 +0300 Subject: [PATCH 04/17] fix: add sequence repository Signed-off-by: Vladislav Sukhin --- cmd/api-server/main.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cmd/api-server/main.go b/cmd/api-server/main.go index 97222a56e8f..00b8fc1cb75 100644 --- a/cmd/api-server/main.go +++ b/cmd/api-server/main.go @@ -57,6 +57,7 @@ import ( "github.com/kubeshop/testkube/pkg/cloud" configrepository "github.com/kubeshop/testkube/pkg/repository/config" "github.com/kubeshop/testkube/pkg/repository/result" + "github.com/kubeshop/testkube/pkg/repository/sequence" "github.com/kubeshop/testkube/pkg/repository/storage" "github.com/kubeshop/testkube/pkg/repository/testresult" @@ -289,7 +290,9 @@ func main() { db, err := storage.GetMongoDatabase(cfg.APIMongoDSN, cfg.APIMongoDB, cfg.APIMongoDBType, cfg.APIMongoAllowTLS, mongoSSLConfig) exitOnError("Getting mongo database", err) isDocDb := cfg.APIMongoDBType == storage.TypeDocDB - mongoResultsRepository := result.NewMongoRepository(db, cfg.APIMongoAllowDiskUse, isDocDb, result.WithFeatureFlags(features), result.WithLogsClient(logGrpcClient)) + sequenceRepository := sequence.NewMongoRepository(db) + mongoResultsRepository := result.NewMongoRepository(db, cfg.APIMongoAllowDiskUse, isDocDb, result.WithFeatureFlags(features), + result.WithLogsClient(logGrpcClient), result.WithMongoRepositorySequenceCollection(sequenceRepository)) resultsRepository = mongoResultsRepository testResultsRepository = testresult.NewMongoRepository(db, cfg.APIMongoAllowDiskUse, isDocDb) testWorkflowResultsRepository = testworkflow2.NewMongoRepository(db, cfg.APIMongoAllowDiskUse) From cb97cd2cac02c58419e14a68651934ba060a5555 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Tue, 23 Jul 2024 16:43:47 +0300 Subject: [PATCH 05/17] fix: add sequence for test suites Signed-off-by: Vladislav Sukhin --- pkg/cloud/data/testresult/testresult.go | 4 ++ pkg/repository/result/mongo.go | 2 +- pkg/repository/sequence/interface.go | 2 +- pkg/repository/sequence/mongo.go | 2 +- pkg/repository/testresult/interface.go | 6 +++ pkg/repository/testresult/mock_repository.go | 15 +++++++ pkg/repository/testresult/mongo.go | 45 ++++++++++++++++++-- pkg/scheduler/test_scheduler.go | 14 +++++- pkg/scheduler/testsuite_scheduler.go | 2 +- 9 files changed, 82 insertions(+), 10 deletions(-) diff --git a/pkg/cloud/data/testresult/testresult.go b/pkg/cloud/data/testresult/testresult.go index 18f23dd080b..71e79908530 100644 --- a/pkg/cloud/data/testresult/testresult.go +++ b/pkg/cloud/data/testresult/testresult.go @@ -252,3 +252,7 @@ func (r *CloudRepository) GetPreviousFinishedState(ctx context.Context, testSuit } return commandResponse.Result, nil } + +func (r *CloudRepository) GetNextExecutionNumber(ctx context.Context, name string) (number int32, err error) { + return 0, nil +} diff --git a/pkg/repository/result/mongo.go b/pkg/repository/result/mongo.go index 1bb2f4187e8..36292a2380d 100644 --- a/pkg/repository/result/mongo.go +++ b/pkg/repository/result/mongo.go @@ -722,7 +722,7 @@ func (r *MongoRepository) DeleteAll(ctx context.Context) (err error) { return } if r.sequenceRepository != nil { - err = r.sequenceRepository.DeleteAllExecutionNumbers(ctx, string(sequence.ExecutionTypeTest)) + err = r.sequenceRepository.DeleteAllExecutionNumbers(ctx, sequence.ExecutionTypeTest) if err != nil { return } diff --git a/pkg/repository/sequence/interface.go b/pkg/repository/sequence/interface.go index c0080959735..c43bcd09888 100644 --- a/pkg/repository/sequence/interface.go +++ b/pkg/repository/sequence/interface.go @@ -13,5 +13,5 @@ type Repository interface { // DeleteExecutionNumbers deletes multiple execution numbers by names and type DeleteExecutionNumbers(ctx context.Context, names []string, executionType ExecutionType) (err error) // DeleteAllExecutionNumbers deletes all execution numbers by type - DeleteAllExecutionNumbers(ctx context.Context, executionType string) (err error) + DeleteAllExecutionNumbers(ctx context.Context, executionType ExecutionType) (err error) } diff --git a/pkg/repository/sequence/mongo.go b/pkg/repository/sequence/mongo.go index 01b62b60a12..d4ea759066f 100644 --- a/pkg/repository/sequence/mongo.go +++ b/pkg/repository/sequence/mongo.go @@ -132,7 +132,7 @@ func (r *MongoRepository) DeleteExecutionNumbers(ctx context.Context, names []st } // DeleteAllExecutionNumbers deletes all execution numbers by type -func (r *MongoRepository) DeleteAllExecutionNumbers(ctx context.Context, executionType string) (err error) { +func (r *MongoRepository) DeleteAllExecutionNumbers(ctx context.Context, executionType ExecutionType) (err error) { isTestSuite := false if executionType == "testsuite" { isTestSuite = true diff --git a/pkg/repository/testresult/interface.go b/pkg/repository/testresult/interface.go index 36dc7d6b470..e3e069fca5c 100644 --- a/pkg/repository/testresult/interface.go +++ b/pkg/repository/testresult/interface.go @@ -29,6 +29,7 @@ type Filter interface { //go:generate mockgen -destination=./mock_repository.go -package=testresult "github.com/kubeshop/testkube/pkg/repository/testresult" Repository type Repository interface { + Sequences // Get gets execution result by id or name Get(ctx context.Context, id string) (testkube.TestSuiteExecution, error) // GetByNameAndTestSuite gets execution result by name @@ -62,3 +63,8 @@ type Repository interface { // Count returns executions count Count(ctx context.Context, filter Filter) (int64, error) } + +type Sequences interface { + // GetNextExecutionNumber gets next execution number by name + GetNextExecutionNumber(ctx context.Context, name string) (number int32, err error) +} diff --git a/pkg/repository/testresult/mock_repository.go b/pkg/repository/testresult/mock_repository.go index 7e748c29566..1459dfadd1e 100644 --- a/pkg/repository/testresult/mock_repository.go +++ b/pkg/repository/testresult/mock_repository.go @@ -202,6 +202,21 @@ func (mr *MockRepositoryMockRecorder) GetLatestByTestSuites(arg0, arg1 interface return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLatestByTestSuites", reflect.TypeOf((*MockRepository)(nil).GetLatestByTestSuites), arg0, arg1) } +// GetNextExecutionNumber mocks base method. +func (m *MockRepository) GetNextExecutionNumber(arg0 context.Context, arg1 string) (int32, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetNextExecutionNumber", arg0, arg1) + ret0, _ := ret[0].(int32) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetNextExecutionNumber indicates an expected call of GetNextExecutionNumber. +func (mr *MockRepositoryMockRecorder) GetNextExecutionNumber(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNextExecutionNumber", reflect.TypeOf((*MockRepository)(nil).GetNextExecutionNumber), arg0, arg1) +} + // GetPreviousFinishedState mocks base method. func (m *MockRepository) GetPreviousFinishedState(arg0 context.Context, arg1 string, arg2 time.Time) (testkube.TestSuiteExecutionStatus, error) { m.ctrl.T.Helper() diff --git a/pkg/repository/testresult/mongo.go b/pkg/repository/testresult/mongo.go index e9b7ec14dd5..13022171b64 100644 --- a/pkg/repository/testresult/mongo.go +++ b/pkg/repository/testresult/mongo.go @@ -2,6 +2,7 @@ package testresult import ( "context" + "errors" "fmt" "strings" "time" @@ -14,6 +15,7 @@ import ( "go.mongodb.org/mongo-driver/mongo/options" "github.com/kubeshop/testkube/pkg/api/v1/testkube" + "github.com/kubeshop/testkube/pkg/repository/sequence" ) var _ Repository = (*MongoRepository)(nil) @@ -36,10 +38,11 @@ func NewMongoRepository(db *mongo.Database, allowDiskUse, isDocDb bool, opts ... } type MongoRepository struct { - db *mongo.Database - Coll *mongo.Collection - allowDiskUse bool - isDocDb bool + db *mongo.Database + Coll *mongo.Collection + allowDiskUse bool + isDocDb bool + sequenceRepository sequence.Repository } func WithMongoRepositoryCollection(collection *mongo.Collection) MongoRepositoryOpt { @@ -48,6 +51,12 @@ func WithMongoRepositoryCollection(collection *mongo.Collection) MongoRepository } } +func WithMongoRepositorySequenceCollection(sequenceRepository sequence.Repository) MongoRepositoryOpt { + return func(r *MongoRepository) { + r.sequenceRepository = sequenceRepository + } +} + type MongoRepositoryOpt func(*MongoRepository) func (r *MongoRepository) Get(ctx context.Context, id string) (result testkube.TestSuiteExecution, err error) { @@ -464,12 +473,25 @@ func composeQueryAndOpts(filter Filter) (bson.M, *options.FindOptions) { // DeleteByTestSuite deletes execution results by test suite func (r *MongoRepository) DeleteByTestSuite(ctx context.Context, testSuiteName string) (err error) { + if r.sequenceRepository != nil { + err = r.sequenceRepository.DeleteExecutionNumber(ctx, testSuiteName, sequence.ExecutionTypeTestSuite) + if err != nil { + return + } + } _, err = r.Coll.DeleteMany(ctx, bson.M{"testsuite.name": testSuiteName}) return } // DeleteAll deletes all execution results func (r *MongoRepository) DeleteAll(ctx context.Context) (err error) { + if r.sequenceRepository != nil { + err = r.sequenceRepository.DeleteAllExecutionNumbers(ctx, sequence.ExecutionTypeTestSuite) + if err != nil { + return + } + } + _, err = r.Coll.DeleteMany(ctx, bson.M{}) return } @@ -492,6 +514,12 @@ func (r *MongoRepository) DeleteByTestSuites(ctx context.Context, testSuiteNames filter = bson.M{"testsuite.name": testSuiteNames[0]} } + if r.sequenceRepository != nil { + err = r.sequenceRepository.DeleteExecutionNumbers(ctx, testSuiteNames, sequence.ExecutionTypeTestSuite) + if err != nil { + return + } + } _, err = r.Coll.DeleteMany(ctx, filter) return } @@ -566,3 +594,12 @@ func (r *MongoRepository) GetPreviousFinishedState(ctx context.Context, testSuit return *result.Status, nil } + +// GetNextExecutionNumber gets next execution number by name +func (r *MongoRepository) GetNextExecutionNumber(ctx context.Context, name string) (number int32, err error) { + if r.sequenceRepository != nil { + return 0, errors.New("no sequence repository provided") + } + + return r.sequenceRepository.GetNextExecutionNumber(ctx, name, sequence.ExecutionTypeTestSuite) +} diff --git a/pkg/scheduler/test_scheduler.go b/pkg/scheduler/test_scheduler.go index 9a654c49d55..99690cb91ab 100644 --- a/pkg/scheduler/test_scheduler.go +++ b/pkg/scheduler/test_scheduler.go @@ -50,7 +50,7 @@ func (s *Scheduler) executeTest(ctx context.Context, test testkube.Test, request request.Name = test.ExecutionRequest.Name } - request.Number = s.getNextExecutionNumber(test.Name) + request.Number = s.getNextTestExecutionNumber(test.Name) if request.Name == "" { request.Name = fmt.Sprintf("%s-%d", test.Name, request.Number) } @@ -192,7 +192,7 @@ func (s *Scheduler) getExecutor(testName string) client.Executor { } } -func (s *Scheduler) getNextExecutionNumber(testName string) int32 { +func (s *Scheduler) getNextTestExecutionNumber(testName string) int32 { number, err := s.testResults.GetNextExecutionNumber(context.Background(), testName) if err != nil { s.logger.Errorw("retrieving latest execution", "error", err) @@ -202,6 +202,16 @@ func (s *Scheduler) getNextExecutionNumber(testName string) int32 { return number } +func (s *Scheduler) getNextTestSuiteExecutionNumber(testSuiteName string) int32 { + number, err := s.testsuiteResults.GetNextExecutionNumber(context.Background(), testSuiteName) + if err != nil { + s.logger.Errorw("retrieving latest execution", "error", err) + return number + } + + return number +} + // createSecretsReferences strips secrets from text and store it inside model as reference to secret func (s *Scheduler) createSecretsReferences(execution *testkube.Execution, options *client.ExecuteOptions) (err error) { secrets := map[string]string{} diff --git a/pkg/scheduler/testsuite_scheduler.go b/pkg/scheduler/testsuite_scheduler.go index 6d93ad9d94c..40804b80df5 100644 --- a/pkg/scheduler/testsuite_scheduler.go +++ b/pkg/scheduler/testsuite_scheduler.go @@ -113,7 +113,7 @@ func (s *Scheduler) executeTestSuite(ctx context.Context, testSuite testkube.Tes s.logger.Infow("Executing testsuite", "test", testSuite.Name, "request", request, "ExecutionRequest", testSuite.ExecutionRequest) - request.Number = s.getNextExecutionNumber("ts-" + testSuite.Name) + request.Number = s.getNextTestSuiteExecutionNumber(testSuite.Name) if request.Name == "" { request.Name = fmt.Sprintf("ts-%s-%d", testSuite.Name, request.Number) } From e0b39a4cc8af78a6fe82af0b4b0c703956e52d62 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Tue, 23 Jul 2024 16:47:01 +0300 Subject: [PATCH 06/17] fix: rename option method Signed-off-by: Vladislav Sukhin --- cmd/api-server/main.go | 4 ++-- pkg/repository/result/mongo.go | 2 +- pkg/repository/testresult/mongo.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/api-server/main.go b/cmd/api-server/main.go index 00b8fc1cb75..21434447ac6 100644 --- a/cmd/api-server/main.go +++ b/cmd/api-server/main.go @@ -292,9 +292,9 @@ func main() { isDocDb := cfg.APIMongoDBType == storage.TypeDocDB sequenceRepository := sequence.NewMongoRepository(db) mongoResultsRepository := result.NewMongoRepository(db, cfg.APIMongoAllowDiskUse, isDocDb, result.WithFeatureFlags(features), - result.WithLogsClient(logGrpcClient), result.WithMongoRepositorySequenceCollection(sequenceRepository)) + result.WithLogsClient(logGrpcClient), result.WithMongoRepositorySequence(sequenceRepository)) resultsRepository = mongoResultsRepository - testResultsRepository = testresult.NewMongoRepository(db, cfg.APIMongoAllowDiskUse, isDocDb) + testResultsRepository = testresult.NewMongoRepository(db, cfg.APIMongoAllowDiskUse, isDocDb, testresult.WithMongoRepositorySequence(sequenceRepository)) testWorkflowResultsRepository = testworkflow2.NewMongoRepository(db, cfg.APIMongoAllowDiskUse) configRepository = configrepository.NewMongoRepository(db) triggerLeaseBackend = triggers.NewMongoLeaseBackend(db) diff --git a/pkg/repository/result/mongo.go b/pkg/repository/result/mongo.go index 36292a2380d..8d814acffcf 100644 --- a/pkg/repository/result/mongo.go +++ b/pkg/repository/result/mongo.go @@ -121,7 +121,7 @@ func WithMongoRepositoryResultCollection(collection *mongo.Collection) MongoRepo } } -func WithMongoRepositorySequenceCollection(sequenceRepository sequence.Repository) MongoRepositoryOpt { +func WithMongoRepositorySequence(sequenceRepository sequence.Repository) MongoRepositoryOpt { return func(r *MongoRepository) { r.sequenceRepository = sequenceRepository } diff --git a/pkg/repository/testresult/mongo.go b/pkg/repository/testresult/mongo.go index 13022171b64..3cf5c0111be 100644 --- a/pkg/repository/testresult/mongo.go +++ b/pkg/repository/testresult/mongo.go @@ -51,7 +51,7 @@ func WithMongoRepositoryCollection(collection *mongo.Collection) MongoRepository } } -func WithMongoRepositorySequenceCollection(sequenceRepository sequence.Repository) MongoRepositoryOpt { +func WithMongoRepositorySequence(sequenceRepository sequence.Repository) MongoRepositoryOpt { return func(r *MongoRepository) { r.sequenceRepository = sequenceRepository } From 91636ba442f8d23c9ac29c3f4590325d3caca7e8 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Tue, 23 Jul 2024 17:08:49 +0300 Subject: [PATCH 07/17] fix: add sequence for test workflows Signed-off-by: Vladislav Sukhin --- cmd/api-server/main.go | 7 +-- pkg/cloud/data/testworkflow/execution.go | 4 ++ pkg/repository/testworkflow/interface.go | 6 +++ .../testworkflow/mock_repository.go | 15 +++++++ pkg/repository/testworkflow/mongo.go | 45 +++++++++++++++++-- .../testworkflowexecutor/executor.go | 11 +++-- 6 files changed, 76 insertions(+), 12 deletions(-) diff --git a/cmd/api-server/main.go b/cmd/api-server/main.go index 21434447ac6..86b596d7046 100644 --- a/cmd/api-server/main.go +++ b/cmd/api-server/main.go @@ -294,8 +294,10 @@ func main() { mongoResultsRepository := result.NewMongoRepository(db, cfg.APIMongoAllowDiskUse, isDocDb, result.WithFeatureFlags(features), result.WithLogsClient(logGrpcClient), result.WithMongoRepositorySequence(sequenceRepository)) resultsRepository = mongoResultsRepository - testResultsRepository = testresult.NewMongoRepository(db, cfg.APIMongoAllowDiskUse, isDocDb, testresult.WithMongoRepositorySequence(sequenceRepository)) - testWorkflowResultsRepository = testworkflow2.NewMongoRepository(db, cfg.APIMongoAllowDiskUse) + testResultsRepository = testresult.NewMongoRepository(db, cfg.APIMongoAllowDiskUse, isDocDb, + testresult.WithMongoRepositorySequence(sequenceRepository)) + testWorkflowResultsRepository = testworkflow2.NewMongoRepository(db, cfg.APIMongoAllowDiskUse, + testworkflow2.WithMongoRepositorySequence(sequenceRepository)) configRepository = configrepository.NewMongoRepository(db) triggerLeaseBackend = triggers.NewMongoLeaseBackend(db) minioClient := newStorageClient(cfg) @@ -573,7 +575,6 @@ func main() { testWorkflowTemplatesClient, testWorkflowProcessor, configMapConfig, - resultsRepository, testWorkflowExecutionsClient, testWorkflowsClient, metrics, diff --git a/pkg/cloud/data/testworkflow/execution.go b/pkg/cloud/data/testworkflow/execution.go index 961e59b3f29..69a0eba3762 100644 --- a/pkg/cloud/data/testworkflow/execution.go +++ b/pkg/cloud/data/testworkflow/execution.go @@ -156,3 +156,7 @@ func (r *CloudRepository) GetPreviousFinishedState(ctx context.Context, workflow } return commandResponse.Result, nil } + +func (r *CloudRepository) GetNextExecutionNumber(ctx context.Context, name string) (number int32, err error) { + return 0, nil +} diff --git a/pkg/repository/testworkflow/interface.go b/pkg/repository/testworkflow/interface.go index 4f0dbca1b8c..cea8a37053e 100644 --- a/pkg/repository/testworkflow/interface.go +++ b/pkg/repository/testworkflow/interface.go @@ -30,6 +30,7 @@ type Filter interface { //go:generate mockgen -destination=./mock_repository.go -package=testworkflow "github.com/kubeshop/testkube/pkg/repository/testworkflow" Repository type Repository interface { + Sequences // Get gets execution result by id or name Get(ctx context.Context, id string) (testkube.TestWorkflowExecution, error) // GetByNameAndTestWorkflow gets execution result by name @@ -68,6 +69,11 @@ type Repository interface { GetTestWorkflowMetrics(ctx context.Context, name string, limit, last int) (metrics testkube.ExecutionsMetrics, err error) } +type Sequences interface { + // GetNextExecutionNumber gets next execution number by name + GetNextExecutionNumber(ctx context.Context, name string) (number int32, err error) +} + //go:generate mockgen -destination=./mock_output_repository.go -package=testworkflow "github.com/kubeshop/testkube/pkg/repository/testworkflow" OutputRepository type OutputRepository interface { // PresignSaveLog builds presigned storage URL to save the output in Minio diff --git a/pkg/repository/testworkflow/mock_repository.go b/pkg/repository/testworkflow/mock_repository.go index 15d4d3cadfa..ddf65acc478 100644 --- a/pkg/repository/testworkflow/mock_repository.go +++ b/pkg/repository/testworkflow/mock_repository.go @@ -188,6 +188,21 @@ func (mr *MockRepositoryMockRecorder) GetLatestByTestWorkflows(arg0, arg1 interf return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLatestByTestWorkflows", reflect.TypeOf((*MockRepository)(nil).GetLatestByTestWorkflows), arg0, arg1) } +// GetNextExecutionNumber mocks base method. +func (m *MockRepository) GetNextExecutionNumber(arg0 context.Context, arg1 string) (int32, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetNextExecutionNumber", arg0, arg1) + ret0, _ := ret[0].(int32) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetNextExecutionNumber indicates an expected call of GetNextExecutionNumber. +func (mr *MockRepositoryMockRecorder) GetNextExecutionNumber(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNextExecutionNumber", reflect.TypeOf((*MockRepository)(nil).GetNextExecutionNumber), arg0, arg1) +} + // GetPreviousFinishedState mocks base method. func (m *MockRepository) GetPreviousFinishedState(arg0 context.Context, arg1 string, arg2 time.Time) (testkube.TestWorkflowStatus, error) { m.ctrl.T.Helper() diff --git a/pkg/repository/testworkflow/mongo.go b/pkg/repository/testworkflow/mongo.go index ef303a404bc..7728e80e61a 100644 --- a/pkg/repository/testworkflow/mongo.go +++ b/pkg/repository/testworkflow/mongo.go @@ -2,6 +2,7 @@ package testworkflow import ( "context" + "errors" "fmt" "strings" "time" @@ -14,6 +15,7 @@ import ( "go.mongodb.org/mongo-driver/mongo/options" "github.com/kubeshop/testkube/pkg/api/v1/testkube" + "github.com/kubeshop/testkube/pkg/repository/sequence" ) var _ Repository = (*MongoRepository)(nil) @@ -35,9 +37,10 @@ func NewMongoRepository(db *mongo.Database, allowDiskUse bool, opts ...MongoRepo } type MongoRepository struct { - db *mongo.Database - Coll *mongo.Collection - allowDiskUse bool + db *mongo.Database + Coll *mongo.Collection + allowDiskUse bool + sequenceRepository sequence.Repository } func WithMongoRepositoryCollection(collection *mongo.Collection) MongoRepositoryOpt { @@ -46,6 +49,12 @@ func WithMongoRepositoryCollection(collection *mongo.Collection) MongoRepository } } +func WithMongoRepositorySequence(sequenceRepository sequence.Repository) MongoRepositoryOpt { + return func(r *MongoRepository) { + r.sequenceRepository = sequenceRepository + } +} + type MongoRepositoryOpt func(*MongoRepository) func (r *MongoRepository) Get(ctx context.Context, id string) (result testkube.TestWorkflowExecution, err error) { @@ -360,12 +369,26 @@ func composeQueryAndOpts(filter Filter) (bson.M, *options.FindOptions) { // DeleteByTestWorkflow deletes execution results by workflow func (r *MongoRepository) DeleteByTestWorkflow(ctx context.Context, workflowName string) (err error) { + if r.sequenceRepository != nil { + err = r.sequenceRepository.DeleteExecutionNumber(ctx, workflowName, sequence.ExecutionTypeTestWorkflow) + if err != nil { + return + } + } + _, err = r.Coll.DeleteMany(ctx, bson.M{"workflow.name": workflowName}) return } // DeleteAll deletes all execution results func (r *MongoRepository) DeleteAll(ctx context.Context) (err error) { + if r.sequenceRepository != nil { + err = r.sequenceRepository.DeleteAllExecutionNumbers(ctx, sequence.ExecutionTypeTestWorkflow) + if err != nil { + return + } + } + _, err = r.Coll.DeleteMany(ctx, bson.M{}) return } @@ -383,6 +406,13 @@ func (r *MongoRepository) DeleteByTestWorkflows(ctx context.Context, workflowNam filter := bson.M{"$or": conditions} + if r.sequenceRepository != nil { + err = r.sequenceRepository.DeleteExecutionNumbers(ctx, workflowNames, sequence.ExecutionTypeTestSuite) + if err != nil { + return + } + } + _, err = r.Coll.DeleteMany(ctx, filter) return } @@ -456,3 +486,12 @@ func (r *MongoRepository) GetPreviousFinishedState(ctx context.Context, testWork return *result.Result.Status, nil } + +// GetNextExecutionNumber gets next execution number by name +func (r *MongoRepository) GetNextExecutionNumber(ctx context.Context, name string) (number int32, err error) { + if r.sequenceRepository != nil { + return 0, errors.New("no sequence repository provided") + } + + return r.sequenceRepository.GetNextExecutionNumber(ctx, name, sequence.ExecutionTypeTestWorkflow) +} diff --git a/pkg/testworkflows/testworkflowexecutor/executor.go b/pkg/testworkflows/testworkflowexecutor/executor.go index b30f359c6d6..c5d2c3761ec 100644 --- a/pkg/testworkflows/testworkflowexecutor/executor.go +++ b/pkg/testworkflows/testworkflowexecutor/executor.go @@ -27,7 +27,6 @@ import ( "github.com/kubeshop/testkube/pkg/log" testworkflowmappers "github.com/kubeshop/testkube/pkg/mapper/testworkflows" configRepo "github.com/kubeshop/testkube/pkg/repository/config" - "github.com/kubeshop/testkube/pkg/repository/result" "github.com/kubeshop/testkube/pkg/repository/testworkflow" "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowcontroller" "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor" @@ -51,7 +50,6 @@ type executor struct { testWorkflowTemplatesClient testworkflowsclientv1.TestWorkflowTemplatesInterface processor testworkflowprocessor.Processor configMap configRepo.Repository - executionResults result.Repository testWorkflowExecutionsClient testworkflowsclientv1.TestWorkflowExecutionsInterface testWorkflowsClient testworkflowsclientv1.Interface metrics v1.Metrics @@ -73,7 +71,6 @@ func New(emitter *event.Emitter, testWorkflowTemplatesClient testworkflowsclientv1.TestWorkflowTemplatesInterface, processor testworkflowprocessor.Processor, configMap configRepo.Repository, - executionResults result.Repository, testWorkflowExecutionsClient testworkflowsclientv1.TestWorkflowExecutionsInterface, testWorkflowsClient testworkflowsclientv1.Interface, metrics v1.Metrics, @@ -92,7 +89,6 @@ func New(emitter *event.Emitter, testWorkflowTemplatesClient: testWorkflowTemplatesClient, processor: processor, configMap: configMap, - executionResults: executionResults, testWorkflowExecutionsClient: testWorkflowExecutionsClient, testWorkflowsClient: testWorkflowsClient, metrics: metrics, @@ -465,8 +461,11 @@ func (e *executor) Execute(ctx context.Context, workflow testworkflowsv1.TestWor } // Load execution identifier data - // TODO: Consider if that should not be shared (as now it is between Tests and Test Suites) - number, _ := e.executionResults.GetNextExecutionNumber(context.Background(), workflow.Name) + number, err := e.repository.GetNextExecutionNumber(context.Background(), workflow.Name) + if err != nil { + log.DefaultLogger.Errorw("failed to retrieve TestWorkflow execution number", "id", id, "error", err) + } + executionName := request.Name if executionName == "" { executionName = fmt.Sprintf("%s-%d", workflow.Name, number) From cbb9cf5117aacd9a423c1868157eb779da5aeea8 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Tue, 23 Jul 2024 20:07:32 +0300 Subject: [PATCH 08/17] fix: skip crd deletion Signed-off-by: Vladislav Sukhin --- api/v1/testkube.yaml | 16 ++++++++++++++++ internal/app/api/v1/tests.go | 24 +++++++++++++----------- internal/app/api/v1/testsuites.go | 18 ++++++++++-------- internal/app/api/v1/testworkflows.go | 13 ++++++++----- 4 files changed, 47 insertions(+), 24 deletions(-) diff --git a/api/v1/testkube.yaml b/api/v1/testkube.yaml index eed82a971e0..0df52c86a41 100644 --- a/api/v1/testkube.yaml +++ b/api/v1/testkube.yaml @@ -556,6 +556,7 @@ paths: - api parameters: - $ref: "#/components/parameters/ID" + - $ref: "#/components/parameters/SkipDeleteCRD" summary: "Delete test suite" description: "Deletes a test suite" operationId: deleteTestSuite @@ -1664,6 +1665,7 @@ paths: parameters: - $ref: "#/components/parameters/ID" - $ref: "#/components/parameters/SkipDeleteExecutions" + - $ref: "#/components/parameters/SkipDeleteCRD" summary: "Delete test" description: "Deletes a test" operationId: deleteTest @@ -3351,6 +3353,7 @@ paths: - api parameters: - $ref: "#/components/parameters/Namespace" + - $ref: "#/components/parameters/ID" summary: "Delete secret" description: "Delete secret in the cluster" operationId: deleteSecret @@ -3393,6 +3396,8 @@ paths: tags: - secrets - api + parameters: + - $ref: "#/components/parameters/ID" summary: "Update secret" description: "Update secret in the cluster" operationId: updateSecret @@ -3446,6 +3451,8 @@ paths: tags: - secrets - api + parameters: + - $ref: "#/components/parameters/ID" summary: "Get secret" description: "Get secret in the cluster" operationId: getSecret @@ -4424,6 +4431,7 @@ paths: parameters: - $ref: "#/components/parameters/ID" - $ref: "#/components/parameters/SkipDeleteExecutions" + - $ref: "#/components/parameters/SkipDeleteCRD" summary: Delete test workflow description: Delete test workflow from the kubernetes cluster operationId: deleteTestWorkflow @@ -10473,6 +10481,14 @@ components: default: false description: flag to request all resources required: false + SkipDeleteCRD: + in: query + name: skipDeleteCRD + schema: + type: boolean + default: false + description: dont delete CRD + required: false requestBodies: UploadsBody: description: "Upload files request body data" diff --git a/internal/app/api/v1/tests.go b/internal/app/api/v1/tests.go index cb0ae7cb44c..77082906c73 100644 --- a/internal/app/api/v1/tests.go +++ b/internal/app/api/v1/tests.go @@ -475,23 +475,25 @@ func (s TestkubeAPI) DeleteTestHandler() fiber.Handler { return s.Error(c, http.StatusBadRequest, fmt.Errorf("failed to delete test: id cannot be empty")) } errPrefix := fmt.Sprintf("failed to delete test %s", name) - err := s.TestsClient.Delete(name) - if err != nil { - if errors.IsNotFound(err) { - return s.Warn(c, http.StatusNotFound, fmt.Errorf("%s: client could not find test: %w", errPrefix, err)) - } + skipCRD := c.Query("skipDeleteCRD", "") + if skipCRD != "true" { + err := s.TestsClient.Delete(name) + if err != nil { + if errors.IsNotFound(err) { + return s.Warn(c, http.StatusNotFound, fmt.Errorf("%s: client could not find test: %w", errPrefix, err)) + } - if _, ok := err.(*testsclientv3.DeleteDependenciesError); ok { - return s.Warn(c, http.StatusInternalServerError, fmt.Errorf("client deleted test %s but deleting test dependencies(secrets) returned errors: %w", name, err)) - } + if _, ok := err.(*testsclientv3.DeleteDependenciesError); ok { + return s.Warn(c, http.StatusInternalServerError, fmt.Errorf("client deleted test %s but deleting test dependencies(secrets) returned errors: %w", name, err)) + } - return s.Error(c, http.StatusInternalServerError, fmt.Errorf("%s: client could not delete test: %w", errPrefix, err)) + return s.Error(c, http.StatusInternalServerError, fmt.Errorf("%s: client could not delete test: %w", errPrefix, err)) + } } - skipExecutions := c.Query("skipDeleteExecutions", "") if skipExecutions != "true" { // delete executions for test - if err = s.ExecutionResults.DeleteByTest(c.Context(), name); err != nil { + if err := s.ExecutionResults.DeleteByTest(c.Context(), name); err != nil { return s.Warn(c, http.StatusInternalServerError, fmt.Errorf("test %s was deleted but deleting test executions returned error: %w", name, err)) } } diff --git a/internal/app/api/v1/testsuites.go b/internal/app/api/v1/testsuites.go index 876166b2299..0e7fb73f27e 100644 --- a/internal/app/api/v1/testsuites.go +++ b/internal/app/api/v1/testsuites.go @@ -246,23 +246,25 @@ func (s TestkubeAPI) DeleteTestSuiteHandler() fiber.Handler { return func(c *fiber.Ctx) error { name := c.Params("id") errPrefix := fmt.Sprintf("failed to delete test suite %s", name) + skipCRD := c.Query("skipDeleteCRD", "") + if skipCRD != "true" { + err := s.TestsSuitesClient.Delete(name) + if err != nil { + if errors.IsNotFound(err) { + return s.Warn(c, http.StatusNotFound, fmt.Errorf("%s: test suite not found: %w", errPrefix, err)) + } - err := s.TestsSuitesClient.Delete(name) - if err != nil { - if errors.IsNotFound(err) { - return s.Warn(c, http.StatusNotFound, fmt.Errorf("%s: test suite not found: %w", errPrefix, err)) + return s.Error(c, http.StatusBadGateway, fmt.Errorf("%s: client could not delete test suite: %w", errPrefix, err)) } - - return s.Error(c, http.StatusBadGateway, fmt.Errorf("%s: client could not delete test suite: %w", errPrefix, err)) } // delete executions for test - if err = s.ExecutionResults.DeleteByTestSuite(c.Context(), name); err != nil { + if err := s.ExecutionResults.DeleteByTestSuite(c.Context(), name); err != nil { return s.Error(c, http.StatusBadGateway, fmt.Errorf("%s: client could not delete test suite test executions: %w", errPrefix, err)) } // delete executions for test suite - if err = s.TestExecutionResults.DeleteByTestSuite(c.Context(), name); err != nil { + if err := s.TestExecutionResults.DeleteByTestSuite(c.Context(), name); err != nil { return s.Error(c, http.StatusBadGateway, fmt.Errorf("%s: client could not delete test suite executions: %w", errPrefix, err)) } diff --git a/internal/app/api/v1/testworkflows.go b/internal/app/api/v1/testworkflows.go index 4bea36afb7b..647b5f7ef15 100644 --- a/internal/app/api/v1/testworkflows.go +++ b/internal/app/api/v1/testworkflows.go @@ -55,14 +55,17 @@ func (s *TestkubeAPI) DeleteTestWorkflowHandler() fiber.Handler { return func(c *fiber.Ctx) error { name := c.Params("id") errPrefix := fmt.Sprintf("failed to delete test workflow '%s'", name) - err := s.TestWorkflowsClient.Delete(name) - s.Metrics.IncDeleteTestWorkflow(err) - if err != nil { - return s.ClientError(c, errPrefix, err) + skipCRD := c.Query("skipDeleteCRD", "") + if skipCRD != "true" { + err := s.TestWorkflowsClient.Delete(name) + s.Metrics.IncDeleteTestWorkflow(err) + if err != nil { + return s.ClientError(c, errPrefix, err) + } } skipExecutions := c.Query("skipDeleteExecutions", "") if skipExecutions != "true" { - err = s.TestWorkflowOutput.DeleteOutputByTestWorkflow(context.Background(), name) + err := s.TestWorkflowOutput.DeleteOutputByTestWorkflow(context.Background(), name) if err != nil { return s.ClientError(c, "deleting executions output", err) } From 5224617cabad002ea1490c581ccc0a4d355f379d Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Wed, 24 Jul 2024 00:10:40 +0300 Subject: [PATCH 09/17] fixL use insert instead of update Signed-off-by: Vladislav Sukhin --- pkg/repository/sequence/mongo.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/pkg/repository/sequence/mongo.go b/pkg/repository/sequence/mongo.go index d4ea759066f..be29a5ee0cc 100644 --- a/pkg/repository/sequence/mongo.go +++ b/pkg/repository/sequence/mongo.go @@ -74,14 +74,18 @@ func (r *MongoRepository) GetNextExecutionNumber(ctx context.Context, name strin ExecutionType: executionType, } - opts := options.FindOneAndUpdate() - opts.SetUpsert(true) - - err = r.Coll.FindOneAndUpdate(ctx, bson.M{"_id": id}, bson.M{"$set": executionNumber}, opts).Err() - if err != nil && !mongo.IsDuplicateKeyError(err) { + err = r.Coll.FindOne(ctx, bson.M{"_id": id}).Err() + if err != nil && err != mongo.ErrNoDocuments { return 0, err } + if err == mongo.ErrNoDocuments { + _, err = r.Coll.InsertOne(ctx, executionNumber) + if err != nil && !mongo.IsDuplicateKeyError(err) { + return 0, err + } + } + if number != 0 { _, err = r.Coll.DeleteOne(ctx, bson.M{"name": oldName}) if err != nil { @@ -89,6 +93,7 @@ func (r *MongoRepository) GetNextExecutionNumber(ctx context.Context, name strin } } + opts := options.FindOneAndUpdate() opts.SetUpsert(false) opts.SetReturnDocument(options.After) From cfecdb66eb1b678d4c42c7060ce0ddb28c2a9135 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Wed, 24 Jul 2024 13:52:06 +0300 Subject: [PATCH 10/17] fix: integration test sync Signed-off-by: Vladislav Sukhin --- .../sequence/mongo_integration_test.go | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 pkg/repository/sequence/mongo_integration_test.go diff --git a/pkg/repository/sequence/mongo_integration_test.go b/pkg/repository/sequence/mongo_integration_test.go new file mode 100644 index 00000000000..7fff6ff63f5 --- /dev/null +++ b/pkg/repository/sequence/mongo_integration_test.go @@ -0,0 +1,58 @@ +package sequence + +import ( + "context" + "testing" + + "github.com/kubeshop/testkube/internal/config" + "github.com/kubeshop/testkube/pkg/utils/test" + + "github.com/stretchr/testify/assert" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +var ( + cfg, _ = config.Get() +) + +func TestNewMongoRepository_GetNextExecutionNumber_Sequential_Integration(t *testing.T) { + test.IntegrationTest(t) + + ctx := context.Background() + + client, err := mongo.Connect(ctx, options.Client().ApplyURI(cfg.APIMongoDSN)) + if err != nil { + t.Fatalf("error connecting to mongo: %v", err) + } + db := client.Database("sequence-mongo-repository-test") + t.Cleanup(func() { + db.Drop(ctx) + }) + + repo := NewMongoRepository(db) + + num1, err := repo.GetNextExecutionNumber(ctx, "name", ExecutionTypeTest) + assert.NoError(t, err) + assert.Equal(t, 1, num1) + + num2, err := repo.GetNextExecutionNumber(ctx, "name", ExecutionTypeTest) + assert.NoError(t, err) + assert.Equal(t, 2, num2) + + num3, err := repo.GetNextExecutionNumber(ctx, "name", ExecutionTypeTestSuite) + assert.NoError(t, err) + assert.Equal(t, 1, num3) + + num4, err := repo.GetNextExecutionNumber(ctx, "name", ExecutionTypeTestSuite) + assert.NoError(t, err) + assert.Equal(t, 2, num4) + + num5, err := repo.GetNextExecutionNumber(ctx, "name", ExecutionTypeTestWorkflow) + assert.NoError(t, err) + assert.Equal(t, 1, num5) + + num6, err := repo.GetNextExecutionNumber(ctx, "name", ExecutionTypeTestWorkflow) + assert.NoError(t, err) + assert.Equal(t, 2, num6) +} From 164a766fee93d3f8424d97a3adea549135a789e3 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Wed, 24 Jul 2024 14:08:18 +0300 Subject: [PATCH 11/17] fix: intergrattion test Signed-off-by: Vladislav Sukhin --- pkg/repository/sequence/mongo.go | 2 +- .../sequence/mongo_integration_test.go | 56 +++++++++++-------- 2 files changed, 35 insertions(+), 23 deletions(-) diff --git a/pkg/repository/sequence/mongo.go b/pkg/repository/sequence/mongo.go index be29a5ee0cc..daac5baac05 100644 --- a/pkg/repository/sequence/mongo.go +++ b/pkg/repository/sequence/mongo.go @@ -98,7 +98,7 @@ func (r *MongoRepository) GetNextExecutionNumber(ctx context.Context, name strin opts.SetReturnDocument(options.After) err = r.Coll.FindOneAndUpdate(ctx, bson.M{"_id": id}, bson.M{"$inc": bson.M{"number": 1}}, opts).Decode(&executionNumber) - if err == nil { + if err != nil { return 0, err } diff --git a/pkg/repository/sequence/mongo_integration_test.go b/pkg/repository/sequence/mongo_integration_test.go index 7fff6ff63f5..23a11d64345 100644 --- a/pkg/repository/sequence/mongo_integration_test.go +++ b/pkg/repository/sequence/mongo_integration_test.go @@ -32,27 +32,39 @@ func TestNewMongoRepository_GetNextExecutionNumber_Sequential_Integration(t *tes repo := NewMongoRepository(db) - num1, err := repo.GetNextExecutionNumber(ctx, "name", ExecutionTypeTest) - assert.NoError(t, err) - assert.Equal(t, 1, num1) - - num2, err := repo.GetNextExecutionNumber(ctx, "name", ExecutionTypeTest) - assert.NoError(t, err) - assert.Equal(t, 2, num2) - - num3, err := repo.GetNextExecutionNumber(ctx, "name", ExecutionTypeTestSuite) - assert.NoError(t, err) - assert.Equal(t, 1, num3) - - num4, err := repo.GetNextExecutionNumber(ctx, "name", ExecutionTypeTestSuite) - assert.NoError(t, err) - assert.Equal(t, 2, num4) - - num5, err := repo.GetNextExecutionNumber(ctx, "name", ExecutionTypeTestWorkflow) - assert.NoError(t, err) - assert.Equal(t, 1, num5) + var tests = []struct { + expectedValue int32 + executionType ExecutionType + }{ + { + 1, + ExecutionTypeTest, + }, + { + 2, + ExecutionTypeTest, + }, + { + 1, + ExecutionTypeTestSuite, + }, + { + 2, + ExecutionTypeTestSuite, + }, + { + 1, + ExecutionTypeTestWorkflow, + }, + { + 2, + ExecutionTypeTestWorkflow, + }, + } - num6, err := repo.GetNextExecutionNumber(ctx, "name", ExecutionTypeTestWorkflow) - assert.NoError(t, err) - assert.Equal(t, 2, num6) + for _, tt := range tests { + num, err := repo.GetNextExecutionNumber(ctx, "name", tt.executionType) + assert.NoError(t, err) + assert.Equal(t, tt.expectedValue, num) + } } From 1020ba095847e5090e90d5eba77966d6fcade7f3 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Wed, 24 Jul 2024 14:53:27 +0300 Subject: [PATCH 12/17] fix: parallel integration test Signed-off-by: Vladislav Sukhin --- .../sequence/mongo_integration_test.go | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/pkg/repository/sequence/mongo_integration_test.go b/pkg/repository/sequence/mongo_integration_test.go index 23a11d64345..03196cb657d 100644 --- a/pkg/repository/sequence/mongo_integration_test.go +++ b/pkg/repository/sequence/mongo_integration_test.go @@ -2,6 +2,8 @@ package sequence import ( "context" + "fmt" + "sync" "testing" "github.com/kubeshop/testkube/internal/config" @@ -68,3 +70,77 @@ func TestNewMongoRepository_GetNextExecutionNumber_Sequential_Integration(t *tes assert.Equal(t, tt.expectedValue, num) } } + +func TestNewMongoRepository_GetNextExecutionNumber_Parallel_Integration(t *testing.T) { + test.IntegrationTest(t) + + ctx := context.Background() + + client, err := mongo.Connect(ctx, options.Client().ApplyURI(cfg.APIMongoDSN)) + if err != nil { + t.Fatalf("error connecting to mongo: %v", err) + } + db := client.Database("sequence-mongo-repository-test") + t.Cleanup(func() { + db.Drop(ctx) + }) + + repo := NewMongoRepository(db) + + var tests = []struct { + expectedValue int32 + executionType ExecutionType + }{ + { + 1, + ExecutionTypeTest, + }, + { + 2, + ExecutionTypeTest, + }, + { + 1, + ExecutionTypeTestSuite, + }, + { + 2, + ExecutionTypeTestSuite, + }, + { + 1, + ExecutionTypeTestWorkflow, + }, + { + 2, + ExecutionTypeTestWorkflow, + }, + } + + var results sync.Map + var wg sync.WaitGroup + + for i := range tests { + wg.Add(1) + go func(executionType ExecutionType) { + defer wg.Done() + + num, err := repo.GetNextExecutionNumber(ctx, "name", executionType) + assert.NoError(t, err) + + results.Store(fmt.Sprintf("%s_%d", executionType, num), num) + }(tests[i].executionType) + } + + wg.Wait() + + for _, tt := range tests { + num, ok := results.Load(fmt.Sprintf("%s_%d", tt.executionType, tt.expectedValue)) + assert.Equal(t, true, ok) + + value, ok := num.(int32) + assert.Equal(t, true, ok) + + assert.Subset(t, []int32{1, 2}, []int32{value}) + } +} From 11bd2261d30568c1c9a1fdee602737d2ee489c9e Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Wed, 24 Jul 2024 15:15:59 +0300 Subject: [PATCH 13/17] fix: use default value Signed-off-by: Vladislav Sukhin --- pkg/repository/sequence/mongo.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/repository/sequence/mongo.go b/pkg/repository/sequence/mongo.go index daac5baac05..607255a737e 100644 --- a/pkg/repository/sequence/mongo.go +++ b/pkg/repository/sequence/mongo.go @@ -94,7 +94,6 @@ func (r *MongoRepository) GetNextExecutionNumber(ctx context.Context, name strin } opts := options.FindOneAndUpdate() - opts.SetUpsert(false) opts.SetReturnDocument(options.After) err = r.Coll.FindOneAndUpdate(ctx, bson.M{"_id": id}, bson.M{"$inc": bson.M{"number": 1}}, opts).Decode(&executionNumber) From 7c49c4097b4a1950a961eca3633b0b9a6f3079be Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Wed, 24 Jul 2024 16:01:11 +0300 Subject: [PATCH 14/17] fix: typo nil check Signed-off-by: Vladislav Sukhin --- pkg/repository/result/mongo.go | 2 +- pkg/repository/testresult/mongo.go | 2 +- pkg/repository/testworkflow/mongo.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/repository/result/mongo.go b/pkg/repository/result/mongo.go index 8d814acffcf..aeffea989b4 100644 --- a/pkg/repository/result/mongo.go +++ b/pkg/repository/result/mongo.go @@ -899,7 +899,7 @@ func (r *MongoRepository) GetPreviousFinishedState(ctx context.Context, testName // GetNextExecutionNumber gets next execution number by name func (r *MongoRepository) GetNextExecutionNumber(ctx context.Context, name string) (number int32, err error) { - if r.sequenceRepository != nil { + if r.sequenceRepository == nil { return 0, errors.New("no sequence repository provided") } diff --git a/pkg/repository/testresult/mongo.go b/pkg/repository/testresult/mongo.go index 3cf5c0111be..ada95dc9820 100644 --- a/pkg/repository/testresult/mongo.go +++ b/pkg/repository/testresult/mongo.go @@ -597,7 +597,7 @@ func (r *MongoRepository) GetPreviousFinishedState(ctx context.Context, testSuit // GetNextExecutionNumber gets next execution number by name func (r *MongoRepository) GetNextExecutionNumber(ctx context.Context, name string) (number int32, err error) { - if r.sequenceRepository != nil { + if r.sequenceRepository == nil { return 0, errors.New("no sequence repository provided") } diff --git a/pkg/repository/testworkflow/mongo.go b/pkg/repository/testworkflow/mongo.go index 7728e80e61a..4d6183ef160 100644 --- a/pkg/repository/testworkflow/mongo.go +++ b/pkg/repository/testworkflow/mongo.go @@ -489,7 +489,7 @@ func (r *MongoRepository) GetPreviousFinishedState(ctx context.Context, testWork // GetNextExecutionNumber gets next execution number by name func (r *MongoRepository) GetNextExecutionNumber(ctx context.Context, name string) (number int32, err error) { - if r.sequenceRepository != nil { + if r.sequenceRepository == nil { return 0, errors.New("no sequence repository provided") } From de1b9fffc64428c8d6d0be1d08653f0560907723 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Wed, 24 Jul 2024 16:48:49 +0300 Subject: [PATCH 15/17] fix: change prefix Signed-off-by: Vladislav Sukhin --- pkg/repository/sequence/mongo.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/repository/sequence/mongo.go b/pkg/repository/sequence/mongo.go index 607255a737e..63fa03eda22 100644 --- a/pkg/repository/sequence/mongo.go +++ b/pkg/repository/sequence/mongo.go @@ -163,7 +163,7 @@ func (r *MongoRepository) getOldNumber(ctx context.Context, name string) (int32, } func getMongoId(name string, executionType ExecutionType) string { - return fmt.Sprintf("%s-%s", name, executionType) + return fmt.Sprintf("%s-%s", executionType, name) } func getOldName(name string, executionType ExecutionType) string { From 16b8cb2670d335e62bb6087e5fd1bc7bb85a9d68 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Wed, 24 Jul 2024 17:22:57 +0300 Subject: [PATCH 16/17] fix: test suite prefix Signed-off-by: Vladislav Sukhin --- pkg/repository/sequence/mongo.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/repository/sequence/mongo.go b/pkg/repository/sequence/mongo.go index 63fa03eda22..5ec90bb5417 100644 --- a/pkg/repository/sequence/mongo.go +++ b/pkg/repository/sequence/mongo.go @@ -138,7 +138,7 @@ func (r *MongoRepository) DeleteExecutionNumbers(ctx context.Context, names []st // DeleteAllExecutionNumbers deletes all execution numbers by type func (r *MongoRepository) DeleteAllExecutionNumbers(ctx context.Context, executionType ExecutionType) (err error) { isTestSuite := false - if executionType == "testsuite" { + if executionType == ExecutionTypeTestSuite { isTestSuite = true } @@ -168,7 +168,7 @@ func getMongoId(name string, executionType ExecutionType) string { func getOldName(name string, executionType ExecutionType) string { oldPrefix := "" - if executionType == "testsuite" { + if executionType == ExecutionTypeTestSuite { oldPrefix = "ts-" } From 251a8dab4f7a352c7a0b0a0b4a903a3582511391 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Thu, 25 Jul 2024 19:40:43 +0300 Subject: [PATCH 17/17] fix: new cloud methods Signed-off-by: Vladislav Sukhin --- pkg/cloud/data/testresult/commands.go | 1 + pkg/cloud/data/testresult/testresult.go | 13 +++++++++-- .../data/testresult/testresult_models.go | 8 +++++++ pkg/cloud/data/testresult/testresult_test.go | 23 +++++++++++++++++++ pkg/cloud/data/testworkflow/commands.go | 3 +++ pkg/cloud/data/testworkflow/execution.go | 13 +++++++++-- .../data/testworkflow/execution_models.go | 8 +++++++ 7 files changed, 65 insertions(+), 4 deletions(-) diff --git a/pkg/cloud/data/testresult/commands.go b/pkg/cloud/data/testresult/commands.go index 1133385049a..4b2d1a87607 100644 --- a/pkg/cloud/data/testresult/commands.go +++ b/pkg/cloud/data/testresult/commands.go @@ -18,4 +18,5 @@ const ( CmdTestResultDeleteAll executor.Command = "test_result_delete_all" CmdTestResultDeleteByTestSuites executor.Command = "test_result_delete_by_test_suites" CmdTestResultGetTestSuiteMetrics executor.Command = "test_result_get_test_suite_metrics" + CmdTestResultGetNextExecutionNumber executor.Command = "test_result_get_next_execution_number" ) diff --git a/pkg/cloud/data/testresult/testresult.go b/pkg/cloud/data/testresult/testresult.go index 71e79908530..9f5c5cd4ba5 100644 --- a/pkg/cloud/data/testresult/testresult.go +++ b/pkg/cloud/data/testresult/testresult.go @@ -253,6 +253,15 @@ func (r *CloudRepository) GetPreviousFinishedState(ctx context.Context, testSuit return commandResponse.Result, nil } -func (r *CloudRepository) GetNextExecutionNumber(ctx context.Context, name string) (number int32, err error) { - return 0, nil +func (r *CloudRepository) GetNextExecutionNumber(ctx context.Context, testSuiteName string) (number int32, err error) { + req := NextExecutionNumberRequest{TestSuiteName: testSuiteName} + response, err := r.executor.Execute(ctx, CmdTestResultGetNextExecutionNumber, req) + if err != nil { + return 0, err + } + var commandResponse NextExecutionNumberResponse + if err := json.Unmarshal(response, &commandResponse); err != nil { + return 0, err + } + return commandResponse.TestSuiteNumber, nil } diff --git a/pkg/cloud/data/testresult/testresult_models.go b/pkg/cloud/data/testresult/testresult_models.go index 5f3379f52d9..5c87e25ce4c 100644 --- a/pkg/cloud/data/testresult/testresult_models.go +++ b/pkg/cloud/data/testresult/testresult_models.go @@ -118,3 +118,11 @@ type GetTestSuiteMetricsRequest struct { type GetTestSuiteMetricsResponse struct { Metrics testkube.ExecutionsMetrics `json:"metrics"` } + +type NextExecutionNumberRequest struct { + TestSuiteName string `json:"testSuiteName"` +} + +type NextExecutionNumberResponse struct { + TestSuiteNumber int32 `json:"testSuiteNumber"` +} diff --git a/pkg/cloud/data/testresult/testresult_test.go b/pkg/cloud/data/testresult/testresult_test.go index 8eedd613dce..8ad878b3b25 100644 --- a/pkg/cloud/data/testresult/testresult_test.go +++ b/pkg/cloud/data/testresult/testresult_test.go @@ -124,3 +124,26 @@ func TestCloudResultRepository_GetPreviousFinishedState(t *testing.T) { assert.NoError(t, err) assert.Equal(t, *expectedStatus, status) } + +func TestCloudResultRepository_GetNextExecutionNumber(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockExecutor := executor.NewMockExecutor(ctrl) + + testSuiteName := "testsuite-1" + var testSuiteNumber int32 = 3 + + // Setup expectations for the mockedExecutor.Execute method + expectedReq := NextExecutionNumberRequest{TestSuiteName: testSuiteName} + expectedResponse, _ := json.Marshal(&NextExecutionNumberResponse{TestSuiteNumber: testSuiteNumber}) + mockExecutor.EXPECT().Execute(gomock.Any(), CmdTestResultGetNextExecutionNumber, expectedReq).Return(expectedResponse, nil) + + r := &CloudRepository{executor: mockExecutor} + + result, err := r.GetNextExecutionNumber(ctx, testSuiteName) + assert.NoError(t, err) + assert.Equal(t, testSuiteNumber, result) +} diff --git a/pkg/cloud/data/testworkflow/commands.go b/pkg/cloud/data/testworkflow/commands.go index 73d5c8c7beb..fa2291483bc 100644 --- a/pkg/cloud/data/testworkflow/commands.go +++ b/pkg/cloud/data/testworkflow/commands.go @@ -21,6 +21,7 @@ const ( CmdTestWorkflowExecutionDeleteAll executor.Command = "workflow_execution_delete_all" CmdTestWorkflowExecutionDeleteByWorkflows executor.Command = "workflow_execution_delete_by_workflows" CmdTestWorkflowExecutionGetWorkflowMetrics executor.Command = "workflow_execution_get_workflow_metrics" + CmdTestWorkflowExecutionGetNextExecutionNumber executor.Command = "workflow_execution_get_next_execution_number" CmdTestWorkflowOutputPresignSaveLog executor.Command = "workflow_output_presign_save_log" CmdTestWorkflowOutputPresignReadLog executor.Command = "workflow_output_presign_read_log" @@ -65,6 +66,8 @@ func command(v interface{}) executor.Command { return CmdTestWorkflowExecutionDeleteByWorkflows case ExecutionGetWorkflowMetricsRequest: return CmdTestWorkflowExecutionGetWorkflowMetrics + case ExecutionGetNextExecutionNumberRequest: + return CmdTestWorkflowExecutionGetNextExecutionNumber case OutputPresignSaveLogRequest: return CmdTestWorkflowOutputPresignSaveLog diff --git a/pkg/cloud/data/testworkflow/execution.go b/pkg/cloud/data/testworkflow/execution.go index 69a0eba3762..a0dd8b69d0b 100644 --- a/pkg/cloud/data/testworkflow/execution.go +++ b/pkg/cloud/data/testworkflow/execution.go @@ -157,6 +157,15 @@ func (r *CloudRepository) GetPreviousFinishedState(ctx context.Context, workflow return commandResponse.Result, nil } -func (r *CloudRepository) GetNextExecutionNumber(ctx context.Context, name string) (number int32, err error) { - return 0, nil +func (r *CloudRepository) GetNextExecutionNumber(ctx context.Context, testWorkflowName string) (number int32, err error) { + req := ExecutionGetNextExecutionNumberRequest{TestWorkflowName: testWorkflowName} + response, err := r.executor.Execute(ctx, CmdTestWorkflowExecutionGetNextExecutionNumber, req) + if err != nil { + return 0, err + } + var commandResponse ExecutionGetNextExecutionNumberResponse + if err := json.Unmarshal(response, &commandResponse); err != nil { + return 0, err + } + return commandResponse.TestWorkflowNumber, nil } diff --git a/pkg/cloud/data/testworkflow/execution_models.go b/pkg/cloud/data/testworkflow/execution_models.go index 01eea60feb9..25834ad6de8 100644 --- a/pkg/cloud/data/testworkflow/execution_models.go +++ b/pkg/cloud/data/testworkflow/execution_models.go @@ -170,3 +170,11 @@ type ExecutionsAddReportRequest struct { } type ExecutionsAddReportResponse struct{} + +type ExecutionGetNextExecutionNumberRequest struct { + TestWorkflowName string `json:"testWorkflowName"` +} + +type ExecutionGetNextExecutionNumberResponse struct { + TestWorkflowNumber int32 `json:"testWorkflowNumber"` +}