Skip to content

Commit

Permalink
Add tes.go to mongodb package
Browse files Browse the repository at this point in the history
  • Loading branch information
lbeckman314 committed Sep 27, 2023
1 parent 7e7c564 commit 68b11b9
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 14 deletions.
6 changes: 3 additions & 3 deletions database/mongodb/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (db *MongoDB) WriteEvent(ctx context.Context, req *events.Event) error {
case events.Type_TASK_STATE:
retrier := util.NewRetrier()
retrier.ShouldRetry = func(err error) bool {
return err == tes.ErrNotFound
return err == tes.ErrConcurrentStateChange
}

return retrier.Retry(ctx, func() error {
Expand All @@ -44,7 +44,7 @@ func (db *MongoDB) WriteEvent(ctx context.Context, req *events.Event) error {
opts := options.FindOne().SetProjection(bson.M{"state": 1, "version": 1})
err := tasks.FindOne(context.TODO(), bson.M{"id": req.Id}, opts).Decode(&current)
if err != nil {
return err
return tes.ErrNotFound
}

// validate state transition
Expand All @@ -60,7 +60,7 @@ func (db *MongoDB) WriteEvent(ctx context.Context, req *events.Event) error {

result, err := tasks.UpdateOne(context.TODO(), selector, update)
if result.MatchedCount == 0 {
return tes.ErrNotFound
return tes.ErrConcurrentStateChange
}

return err
Expand Down
26 changes: 15 additions & 11 deletions database/mongodb/new.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ type MongoDB struct {
active bool
}

// export MONGODB_URI="mongodb://localhost:27000/?retryWrites=true&w=majority"
// go run mongo-test.go
func NewMongoDB(conf config.MongoDB) (*MongoDB, error) {
client, err := mongo.Connect(
context.TODO(),
Expand Down Expand Up @@ -48,7 +46,6 @@ func (db *MongoDB) nodes(client *mongo.Client) *mongo.Collection {

// Init creates tables in MongoDB.
func (db *MongoDB) Init() error {
defer db.client.Disconnect(context.TODO())
tasks := db.tasks(db.client)
nodes := db.nodes(db.client)

Expand All @@ -73,10 +70,14 @@ func (db *MongoDB) Init() error {
return fmt.Errorf("error creating tasks collection in database %s: %v", db.conf.Database, err)
}

_, err = tasks.Indexes().CreateOne(context.TODO(), mongo.IndexModel{
Keys: bson.D{{"-id", -1}, {"-creationtime", -1}},
Options: options.Index().SetUnique(true).SetSparse(true),
})
indexModel := mongo.IndexModel{
Keys: bson.D{
{"-id", -1},
{"-creationtime", -1},
},
Options: options.Index().SetUnique(true).SetSparse(true),
}
_, err = tasks.Indexes().CreateOne(context.TODO(), indexModel)
if err != nil {
return err
}
Expand All @@ -88,10 +89,13 @@ func (db *MongoDB) Init() error {
return fmt.Errorf("error creating nodes collection in database %s: %v", db.conf.Database, err)
}

_, err = nodes.Indexes().CreateOne(context.TODO(), mongo.IndexModel{
Keys: bson.D{{"-id", -1}, {"-creationtime", -1}},
Options: options.Index().SetUnique(true).SetSparse(true),
})
indexModel := mongo.IndexModel{
Keys: bson.D{
{"-id", -1},
},
Options: options.Index().SetUnique(true).SetSparse(true),
}
_, err = nodes.Indexes().CreateOne(context.TODO(), indexModel)
if err != nil {
return err
}
Expand Down
95 changes: 95 additions & 0 deletions database/mongodb/tes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package mongodb

import (
"fmt"

"github.com/ohsu-comp-bio/funnel/tes"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
"golang.org/x/net/context"
)

var basicView = bson.M{
"logs.systemlogs": 0,
"logs.logs.stdout": 0,
"logs.logs.stderr": 0,
"inputs.content": 0,
}
var minimalView = bson.M{"id": 1, "state": 1}

// GetTask gets a task, which describes a running task
func (db *MongoDB) GetTask(ctx context.Context, req *tes.GetTaskRequest) (*tes.Task, error) {
var task tes.Task
var opts = options.FindOne()

switch req.View {
case tes.View_BASIC.String():
opts = opts.SetProjection(basicView)
case tes.View_MINIMAL.String():
opts = opts.SetProjection(minimalView)
}

err := db.tasks(db.client).FindOne(context.TODO(), bson.M{"id": req.Id}, opts).Decode(&task)
if err != nil {
return nil, tes.ErrNotFound
}

return &task, nil
}

// ListTasks returns a list of taskIDs
func (db *MongoDB) ListTasks(ctx context.Context, req *tes.ListTasksRequest) (*tes.ListTasksResponse, error) {
pageSize := tes.GetPageSize(req.GetPageSize())

var query = bson.M{}
var err error
if req.PageToken != "" {
query["id"] = bson.M{"$lt": req.PageToken}
}

if req.State != tes.Unknown {
query["state"] = bson.M{"$eq": req.State}
}

if req.NamePrefix != "" {
query["name"] = bson.M{"$regex": fmt.Sprintf("^%s", req.NamePrefix)}
}

for k, v := range req.GetTags() {
if v == "" {
query[fmt.Sprintf("tags.%s", k)] = bson.M{"$exists": true}
} else {
query[fmt.Sprintf("tags.%s", k)] = bson.M{"$eq": v}
}
}

var opts = options.Find().SetSort(bson.M{"creationtime": -1}).SetLimit(int64(pageSize))

switch req.View {
case tes.View_BASIC.String():
opts = opts.SetProjection(basicView)
case tes.View_MINIMAL.String():
opts = opts.SetProjection(minimalView)
}

cursor, err := db.tasks(db.client).Find(context.TODO(), query, opts)
if err != nil {
return nil, err
}

var tasks []*tes.Task
err = cursor.All(context.TODO(), &tasks)
if err != nil {
return nil, err
}

out := tes.ListTasksResponse{
Tasks: tasks,
}
// TODO figure out when not to return a next page token
if len(tasks) > 0 {
out.NextPageToken = tasks[len(tasks)-1].Id
}

return &out, nil
}
1 change: 1 addition & 0 deletions tes/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func Base64Decode(raw string) (*Task, error) {

// ErrNotFound is returned when a task is not found.
var ErrNotFound = errors.New("task not found")
var ErrConcurrentStateChange = errors.New("Concurrent stage change")

// Shorthand for task views
const (
Expand Down

0 comments on commit 68b11b9

Please sign in to comment.