Skip to content

Commit

Permalink
Update mongo drivers from mgo to mongo
Browse files Browse the repository at this point in the history
  • Loading branch information
lbeckman314 committed Sep 27, 2023
1 parent 2bd6757 commit 7e7c564
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 71 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ test-elasticsearch:

start-mongodb:
@docker rm -f funnel-mongodb-test > /dev/null 2>&1 || echo
@docker run -d --name funnel-mongodb-test -p 27000:27017 docker.io/mongo:3.5.13 > /dev/null
@docker run -d --name funnel-mongodb-test -p 27000:27017 docker.io/mongo > /dev/null

test-mongodb:
@go test ./tests/core/ --funnel-config `pwd`/tests/mongo.config.yml
Expand Down Expand Up @@ -164,7 +164,7 @@ test-htcondor:
@go test -timeout 120s ./tests/htcondor -funnel-config `pwd`/tests/htcondor.config.yml

test-slurm:
@docker pull quay.io/ohsu-comp-bio/slurm
@docker pull https://ohsu-comp-bio.github.io/funnel-compliance/quay.io/ohsu-comp-bio/slurm
@go test -timeout 120s ./tests/slurm -funnel-config `pwd`/tests/slurm.config.yml

test-gridengine:
Expand Down
27 changes: 18 additions & 9 deletions database/mongodb/counts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package mongodb
import (
"context"

"github.com/globalsign/mgo/bson"
"github.com/ohsu-comp-bio/funnel/tes"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)

type stateCount struct {
Expand All @@ -14,16 +15,24 @@ type stateCount struct {

// TaskStateCounts returns the number of tasks in each state.
func (db *MongoDB) TaskStateCounts(ctx context.Context) (map[string]int32, error) {
sess := db.sess.Copy()
defer sess.Close()

pipe := db.tasks(sess).Pipe([]bson.M{
{"$sort": bson.M{"state": 1}},
{"$group": bson.M{"_id": "$state", "count": bson.M{"$sum": 1}}},
})
stateStage := bson.D{{
"$sort", bson.D{{"state", 1}},
}}

groupStage := bson.D{
{"$group", bson.D{
{"_id", "$state"},
{"count", bson.D{{"$sum", 1}}},
},
}}

cursor, err := db.tasks(db.client).Aggregate(context.TODO(), mongo.Pipeline{stateStage, groupStage})
if err != nil {
return nil, err
}

recs := []stateCount{}
err := pipe.All(&recs)
err = cursor.All(context.TODO(), &recs)
if err != nil {
return nil, err
}
Expand Down
36 changes: 21 additions & 15 deletions database/mongodb/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,16 @@ import (
"fmt"
"time"

"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/ohsu-comp-bio/funnel/events"
"github.com/ohsu-comp-bio/funnel/tes"
"github.com/ohsu-comp-bio/funnel/util"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
)

// WriteEvent creates an event for the server to handle.
func (db *MongoDB) WriteEvent(ctx context.Context, req *events.Event) error {
sess := db.sess.Copy()
defer sess.Close()
tasks := db.tasks(sess)
tasks := db.tasks(db.client)

update := bson.M{}
selector := bson.M{"id": req.Id}
Expand All @@ -27,30 +25,30 @@ func (db *MongoDB) WriteEvent(ctx context.Context, req *events.Event) error {
task.Logs = []*tes.TaskLog{
{
Logs: []*tes.ExecutorLog{},
Metadata: map[string]string{},
SystemLogs: []string{},
},
}
return tasks.Insert(&task)
_, err := tasks.InsertOne(context.TODO(), &task)
return err

case events.Type_TASK_STATE:
retrier := util.NewRetrier()
retrier.ShouldRetry = func(err error) bool {
return err == mgo.ErrNotFound
return err == tes.ErrNotFound
}

return retrier.Retry(ctx, func() error {
// get current state & version
current := make(map[string]interface{})
q := tasks.Find(bson.M{"id": req.Id}).Select(bson.M{"state": 1, "version": 1})
err := q.One(&current)
if err == mgo.ErrNotFound {
return tes.ErrNotFound
}
opts := options.FindOne().SetProjection(bson.M{"state": 1, "version": 1})
err := tasks.FindOne(context.TODO(), bson.M{"id": req.Id}, opts).Decode(&current)
if err != nil {
return err
}

// validate state transition
from := tes.State(current["state"].(int))
from := tes.State(current["state"].(int32))
to := req.GetState()
if err = tes.ValidateTransition(from, to); err != nil {
return err
Expand All @@ -59,7 +57,13 @@ func (db *MongoDB) WriteEvent(ctx context.Context, req *events.Event) error {
// apply version restriction and set update
selector["version"] = current["version"]
update = bson.M{"$set": bson.M{"state": to, "version": time.Now().UnixNano()}}
return tasks.Update(selector, update)

result, err := tasks.UpdateOne(context.TODO(), selector, update)
if result.MatchedCount == 0 {
return tes.ErrNotFound
}

return err
})

case events.Type_TASK_START_TIME:
Expand Down Expand Up @@ -133,5 +137,7 @@ func (db *MongoDB) WriteEvent(ctx context.Context, req *events.Event) error {
}
}

return tasks.Update(selector, update)
opts := options.Update().SetUpsert(true)
_, err := tasks.UpdateOne(context.TODO(), selector, update, opts)
return err
}
21 changes: 7 additions & 14 deletions database/mongodb/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package mongodb
import (
"fmt"

"github.com/globalsign/mgo"
"github.com/ohsu-comp-bio/funnel/compute/scheduler"
"github.com/ohsu-comp-bio/funnel/tes"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
Expand All @@ -15,11 +15,10 @@ import (

// ReadQueue returns a slice of queued Tasks. Up to "n" tasks are returned.
func (db *MongoDB) ReadQueue(n int) []*tes.Task {
defer db.client.Disconnect(context.TODO())

fmt.Println("Reading queue!")
var tasks []*tes.Task
opts := options.Find().SetSort(bson.D{{"creationtime", 1}}).SetLimit(int64(n))
cursor, err := db.tasks(db.client).Find(context.TODO(), bson.D{{"state", tes.State_QUEUED}}, opts)
opts := options.Find().SetSort(bson.M{"creationtime": 1}).SetLimit(int64(n))
cursor, err := db.tasks(db.client).Find(context.TODO(), bson.M{"state": tes.State_QUEUED}, opts)

err = cursor.All(context.TODO(), &tasks)
if err != nil {
Expand All @@ -34,7 +33,6 @@ func (db *MongoDB) ReadQueue(n int) []*tes.Task {
// and status updates, such as completed tasks. The server responds with updated
// information for the node, such as canceled tasks.
func (db *MongoDB) PutNode(ctx context.Context, node *scheduler.Node) (*scheduler.PutNodeResponse, error) {
defer db.client.Disconnect(context.TODO())
nodes := db.nodes(db.client)

q := bson.M{"id": node.Id}
Expand Down Expand Up @@ -64,11 +62,9 @@ func (db *MongoDB) PutNode(ctx context.Context, node *scheduler.Node) (*schedule

// GetNode gets a node
func (db *MongoDB) GetNode(ctx context.Context, req *scheduler.GetNodeRequest) (*scheduler.Node, error) {
defer db.client.Disconnect(context.TODO())

var node scheduler.Node
err := db.nodes(db.client).FindOne(context.TODO(), bson.M{"id": req.Id}).Decode(&node)
if err != nil {
if err == mongo.ErrNoDocuments {
return nil, status.Errorf(codes.NotFound, fmt.Sprintf("%v: nodeID: %s", err, req.Id))
}

Expand All @@ -77,18 +73,15 @@ func (db *MongoDB) GetNode(ctx context.Context, req *scheduler.GetNodeRequest) (

// DeleteNode deletes a node
func (db *MongoDB) DeleteNode(ctx context.Context, req *scheduler.Node) (*scheduler.DeleteNodeResponse, error) {
defer db.client.Disconnect(context.TODO())

_, err := db.nodes(db.client).DeleteOne(context.TODO(), bson.M{"id": req.Id})
if err == mgo.ErrNotFound {
return nil, status.Errorf(codes.NotFound, fmt.Sprintf("%v: nodeID: %s", mgo.ErrNotFound.Error(), req.Id))
if err == mongo.ErrNoDocuments {
return nil, status.Errorf(codes.NotFound, fmt.Sprintf("%v: nodeID: %s", err, req.Id))
}
return nil, err
}

// ListNodes is an API endpoint that returns a list of nodes.
func (db *MongoDB) ListNodes(ctx context.Context, req *scheduler.ListNodesRequest) (*scheduler.ListNodesResponse, error) {
defer db.client.Disconnect(context.TODO())
var nodes []*scheduler.Node
cursor, err := db.nodes(db.client).Find(context.TODO(), nil)
if err != nil {
Expand Down
13 changes: 8 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ require (
github.com/gizak/termui v2.3.0+incompatible
github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8
github.com/go-test/deep v1.1.0
github.com/gogo/protobuf v1.3.2
github.com/golang/gddo v0.0.0-20200219175727-df439dd5819e
github.com/golang/protobuf v1.5.3
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.2
github.com/hashicorp/go-multierror v1.1.1
github.com/imdario/mergo v0.3.15
github.com/jlaffaye/ftp v0.0.0-20191218041957-e1b8fdd0dcc3
github.com/joho/godotenv v1.5.1
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/kr/pretty v0.3.1
github.com/logrusorgru/aurora v0.0.0-20200102142835-e9ef32dff381
Expand All @@ -43,6 +45,7 @@ require (
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.2
go.mongodb.org/mongo-driver v1.12.1
golang.org/x/crypto v0.10.0
golang.org/x/net v0.11.0
golang.org/x/oauth2 v0.7.0
Expand All @@ -66,8 +69,6 @@ require (
github.com/Microsoft/go-winio v0.6.0 // indirect
github.com/OneOfOne/xxhash v1.2.8 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/br0xen/boltbrowser v0.0.0-20230531143731-fcc13603daaf // indirect
github.com/br0xen/termbox-util v0.0.0-20170904143325-de1d4c83380e // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
Expand All @@ -84,14 +85,12 @@ require (
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/frankban/quicktest v1.14.3 // indirect
github.com/gammazero/deque v0.0.0-20190521012701-46e4ffb7a622 // indirect
github.com/go-bindata/go-bindata v3.1.2+incompatible // indirect
github.com/go-ini/ini v1.52.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
Expand Down Expand Up @@ -120,6 +119,7 @@ require (
github.com/moby/term v0.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nsf/termbox-go v1.1.1 // indirect
Expand All @@ -139,8 +139,11 @@ require (
github.com/stretchr/objx v0.5.0 // indirect
github.com/tklauser/go-sysconf v0.3.11 // indirect
github.com/tklauser/numcpus v0.6.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.etcd.io/bbolt v1.3.7 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/mod v0.11.0 // indirect
golang.org/x/sync v0.3.0 // indirect
Expand Down
Loading

0 comments on commit 7e7c564

Please sign in to comment.