Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

database/mongodb: copy session for each op and maintain a connection pool #612

Merged
merged 4 commits into from
Jun 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@
version = "1.12.0"

[[constraint]]
branch = "v2"
name = "gopkg.in/mgo.v2"
version = "r2018.06.15"
name = "github.com/globalsign/mgo"

[[constraint]]
name = "gopkg.in/olivere/elastic.v5"
Expand Down
7 changes: 5 additions & 2 deletions database/mongodb/counts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package mongodb
import (
"context"

"github.com/globalsign/mgo/bson"
"github.com/ohsu-comp-bio/funnel/tes"
"gopkg.in/mgo.v2/bson"
)

type stateCount struct {
Expand All @@ -14,7 +14,10 @@ type stateCount struct {

// TaskStateCounts returns the number of tasks in each state.
func (db *MongoDB) TaskStateCounts(ctx context.Context) (map[string]int32, error) {
pipe := db.tasks.Pipe([]bson.M{
sess := db.sess.Copy()
defer sess.Close()

pipe := db.tasks(sess).Pipe([]bson.M{
{"$group": bson.M{"_id": "$state", "count": bson.M{"$sum": 1}}},
})

Expand Down
16 changes: 10 additions & 6 deletions database/mongodb/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@ import (
"fmt"
"time"

"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/ohsu-comp-bio/funnel/events"
"github.com/ohsu-comp-bio/funnel/tes"
"github.com/ohsu-comp-bio/funnel/util"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
)

// WriteEvent creates an event for the server to handle.
func (db *MongoDB) WriteEvent(ctx context.Context, req *events.Event) error {
sess := db.sess.Copy()
defer sess.Close()
tasks := db.tasks(sess)

update := bson.M{}
selector := bson.M{"id": req.Id}

Expand All @@ -25,7 +29,7 @@ func (db *MongoDB) WriteEvent(ctx context.Context, req *events.Event) error {
Logs: []*tes.ExecutorLog{},
},
}
return db.tasks.Insert(&task)
return tasks.Insert(&task)

case events.Type_TASK_STATE:
retrier := util.NewRetrier()
Expand All @@ -39,7 +43,7 @@ func (db *MongoDB) WriteEvent(ctx context.Context, req *events.Event) error {
return retrier.Retry(ctx, func() error {
// get current state & version
current := make(map[string]interface{})
q := db.tasks.Find(bson.M{"id": req.Id}).Select(bson.M{"state": 1, "version": 1})
q := tasks.Find(bson.M{"id": req.Id}).Select(bson.M{"state": 1, "version": 1})
err := q.One(&current)
if err == mgo.ErrNotFound {
return tes.ErrNotFound
Expand All @@ -58,7 +62,7 @@ func (db *MongoDB) WriteEvent(ctx context.Context, req *events.Event) error {
// apply version restriction and set update
selector["version"] = current["version"]
update = bson.M{"$set": bson.M{"state": to, "version": time.Now().UnixNano()}}
return db.tasks.Update(selector, update)
return tasks.Update(selector, update)
})

case events.Type_TASK_START_TIME:
Expand Down Expand Up @@ -132,5 +136,5 @@ func (db *MongoDB) WriteEvent(ctx context.Context, req *events.Event) error {
}
}

return db.tasks.Update(selector, update)
return tasks.Update(selector, update)
}
56 changes: 33 additions & 23 deletions database/mongodb/new.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,56 @@ import (
"fmt"
"time"

"github.com/globalsign/mgo"
"github.com/ohsu-comp-bio/funnel/config"
"gopkg.in/mgo.v2"
)

// MongoDB provides an MongoDB database server backend.
type MongoDB struct {
sess *mgo.Session
conf config.MongoDB
tasks *mgo.Collection
nodes *mgo.Collection
// events *mgo.Collection
sess *mgo.Session
conf config.MongoDB
}

// NewMongoDB returns a new MongoDB instance.
func NewMongoDB(conf config.MongoDB) (*MongoDB, error) {
sess, err := mgo.DialWithInfo(&mgo.DialInfo{
Addrs: conf.Addrs,
Username: conf.Username,
Password: conf.Password,
Database: conf.Database,
Timeout: time.Duration(conf.Timeout),
// DialServer: func(addr *mgo.ServerAddr) (net.Conn, error) {
// return tls.Dial("tcp", addr.String(), &tls.Config{})
// },
Addrs: conf.Addrs,
Username: conf.Username,
Password: conf.Password,
Database: conf.Database,
Timeout: time.Duration(conf.Timeout),
AppName: "funnel",
PoolLimit: 4096,
PoolTimeout: 0, // wait for connection to become available
MinPoolSize: 10,
MaxIdleTimeMS: 120000, // 2 min
})
if err != nil {
return nil, err
}
db := &MongoDB{
sess: sess,
conf: conf,
tasks: sess.DB(conf.Database).C("tasks"),
nodes: sess.DB(conf.Database).C("nodes"),
sess: sess,
conf: conf,
}
return db, nil
}

func (db *MongoDB) tasks(sess *mgo.Session) *mgo.Collection {
return sess.DB(db.conf.Database).C("tasks")
}

func (db *MongoDB) nodes(sess *mgo.Session) *mgo.Collection {
return sess.DB(db.conf.Database).C("nodes")
}

// Init creates tables in MongoDB.
func (db *MongoDB) Init() error {
names, err := db.sess.DB(db.conf.Database).CollectionNames()
sess := db.sess.Copy()
defer sess.Close()
tasks := db.tasks(sess)
nodes := db.nodes(sess)

names, err := sess.DB(db.conf.Database).CollectionNames()
if err != nil {
return fmt.Errorf("error listing collection names in database %s: %v", db.conf.Database, err)
}
Expand All @@ -59,12 +69,12 @@ func (db *MongoDB) Init() error {
}

if !tasksFound {
err = db.tasks.Create(&mgo.CollectionInfo{})
err = tasks.Create(&mgo.CollectionInfo{})
if err != nil {
return fmt.Errorf("error creating tasks collection in database %s: %v", db.conf.Database, err)
}

err = db.tasks.EnsureIndex(mgo.Index{
err = tasks.EnsureIndex(mgo.Index{
Key: []string{"-id", "-creationtime"},
Unique: true,
DropDups: true,
Expand All @@ -77,12 +87,12 @@ func (db *MongoDB) Init() error {
}

if !nodesFound {
err = db.nodes.Create(&mgo.CollectionInfo{})
err = nodes.Create(&mgo.CollectionInfo{})
if err != nil {
return fmt.Errorf("error creating nodes collection in database %s: %v", db.conf.Database, err)
}

err = db.nodes.EnsureIndex(mgo.Index{
err = nodes.EnsureIndex(mgo.Index{
Key: []string{"id"},
Unique: true,
DropDups: true,
Expand Down
32 changes: 24 additions & 8 deletions database/mongodb/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@ package mongodb
import (
"fmt"

"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/ohsu-comp-bio/funnel/compute/scheduler"
"github.com/ohsu-comp-bio/funnel/tes"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
)

// ReadQueue returns a slice of queued Tasks. Up to "n" tasks are returned.
func (db *MongoDB) ReadQueue(n int) []*tes.Task {
sess := db.sess.Copy()
defer sess.Close()

var tasks []*tes.Task
err := db.tasks.Find(bson.M{"state": tes.State_QUEUED}).Sort("creationtime").Select(basicView).Limit(n).All(&tasks)
err := db.tasks(sess).Find(bson.M{"state": tes.State_QUEUED}).Sort("creationtime").Select(basicView).Limit(n).All(&tasks)
if err != nil {
fmt.Println(err)
return nil
Expand All @@ -27,14 +30,18 @@ func (db *MongoDB) ReadQueue(n int) []*tes.Task {
// and status updates, such as completed tasks. The server responds with updated
// information for the node, such as canceled tasks.
func (db *MongoDB) PutNode(ctx context.Context, node *scheduler.Node) (*scheduler.PutNodeResponse, error) {
sess := db.sess.Copy()
defer sess.Close()
nodes := db.nodes(sess)

q := bson.M{"id": node.Id}

if node.GetVersion() != 0 {
q["version"] = node.GetVersion()
}

var existing scheduler.Node
err := db.nodes.Find(bson.M{"id": node.Id}).One(&existing)
err := nodes.Find(bson.M{"id": node.Id}).One(&existing)
if err != nil && err != mgo.ErrNotFound {
return nil, err
}
Expand All @@ -46,15 +53,18 @@ func (db *MongoDB) PutNode(ctx context.Context, node *scheduler.Node) (*schedule

node.Version = node.GetVersion() + 1

_, err = db.nodes.Upsert(q, node)
_, err = nodes.Upsert(q, node)

return &scheduler.PutNodeResponse{}, err
}

// GetNode gets a node
func (db *MongoDB) GetNode(ctx context.Context, req *scheduler.GetNodeRequest) (*scheduler.Node, error) {
sess := db.sess.Copy()
defer sess.Close()

var node scheduler.Node
err := db.nodes.Find(bson.M{"id": req.Id}).One(&node)
err := db.nodes(sess).Find(bson.M{"id": req.Id}).One(&node)
if err == mgo.ErrNotFound {
return nil, grpc.Errorf(codes.NotFound, fmt.Sprintf("%v: nodeID: %s", mgo.ErrNotFound.Error(), req.Id))
}
Expand All @@ -63,7 +73,10 @@ func (db *MongoDB) GetNode(ctx context.Context, req *scheduler.GetNodeRequest) (

// DeleteNode deletes a node
func (db *MongoDB) DeleteNode(ctx context.Context, req *scheduler.Node) (*scheduler.DeleteNodeResponse, error) {
err := db.nodes.Remove(bson.M{"id": req.Id})
sess := db.sess.Copy()
defer sess.Close()

err := db.nodes(sess).Remove(bson.M{"id": req.Id})
if err == mgo.ErrNotFound {
return nil, grpc.Errorf(codes.NotFound, fmt.Sprintf("%v: nodeID: %s", mgo.ErrNotFound.Error(), req.Id))
}
Expand All @@ -72,8 +85,11 @@ func (db *MongoDB) DeleteNode(ctx context.Context, req *scheduler.Node) (*schedu

// ListNodes is an API endpoint that returns a list of nodes.
func (db *MongoDB) ListNodes(ctx context.Context, req *scheduler.ListNodesRequest) (*scheduler.ListNodesResponse, error) {
sess := db.sess.Copy()
defer sess.Close()

var nodes []*scheduler.Node
err := db.nodes.Find(nil).All(&nodes)
err := db.nodes(sess).Find(nil).All(&nodes)
if err != nil {
return nil, err
}
Expand Down
14 changes: 10 additions & 4 deletions database/mongodb/tes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package mongodb
import (
"fmt"

"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/ohsu-comp-bio/funnel/tes"
"golang.org/x/net/context"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
)

var basicView = bson.M{
Expand All @@ -19,10 +19,13 @@ var minimalView = bson.M{"id": 1, "state": 1}

// GetTask gets a task, which describes a running task
func (db *MongoDB) GetTask(ctx context.Context, req *tes.GetTaskRequest) (*tes.Task, error) {
sess := db.sess.Copy()
defer sess.Close()

var task tes.Task
var q *mgo.Query

q = db.tasks.Find(bson.M{"id": req.Id})
q = db.tasks(sess).Find(bson.M{"id": req.Id})
switch req.View {
case tes.TaskView_BASIC:
q = q.Select(basicView)
Expand All @@ -43,6 +46,9 @@ func (db *MongoDB) GetTask(ctx context.Context, req *tes.GetTaskRequest) (*tes.T

// ListTasks returns a list of taskIDs
func (db *MongoDB) ListTasks(ctx context.Context, req *tes.ListTasksRequest) (*tes.ListTasksResponse, error) {
sess := db.sess.Copy()
defer sess.Close()

pageSize := tes.GetPageSize(req.GetPageSize())

var query = bson.M{}
Expand All @@ -60,7 +66,7 @@ func (db *MongoDB) ListTasks(ctx context.Context, req *tes.ListTasksRequest) (*t
query[fmt.Sprintf("tags.%s", k)] = bson.M{"$eq": v}
}

q = db.tasks.Find(query).Sort("-creationtime").Limit(pageSize)
q = db.tasks(sess).Find(query).Sort("-creationtime").Limit(pageSize)

switch req.View {
case tes.TaskView_BASIC:
Expand Down
Loading