diff --git a/Gopkg.lock b/Gopkg.lock index ec585720..8fc30309 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -309,6 +309,20 @@ revision = "24acd523c756fd9728824cdfac66aad9d8982fb7" version = "v2.2.0" +[[projects]] + digest = "1:4b08116de0de75c041bb341686f0b139930f26cb84dfdf7641d435548114181d" + name = "github.com/globalsign/mgo" + packages = [ + ".", + "bson", + "internal/json", + "internal/sasl", + "internal/scram", + ] + pruneopts = "UT" + revision = "113d3961e7311526535a1ef7042196563d442761" + version = "r2018.06.15" + [[projects]] digest = "1:4afe9b10ebaf13d77f8bad44b2af5a55c17699b375601a6383a671b0713845b9" name = "github.com/go-ini/ini" @@ -943,20 +957,6 @@ revision = "41344da2231b913fa3d983840a57a6b1b7b631a1" version = "v1.12.0" -[[projects]] - branch = "v2" - digest = "1:5052389c6c809eeced44ec24ff9958bfea70b9859b7a25061c6fd9086c8f8989" - name = "gopkg.in/mgo.v2" - packages = [ - ".", - "bson", - "internal/json", - "internal/sasl", - "internal/scram", - ] - pruneopts = "UT" - revision = "3f83fa5005286a7fe593b055f0d7771a7dce4655" - [[projects]] digest = "1:418363915247a800f7449c1b43b983cbf99c9bd8b8c5094e68f509543437dc9f" name = "gopkg.in/olivere/elastic.v5" @@ -1009,6 +1009,8 @@ "github.com/getlantern/deepcopy", "github.com/ghodss/yaml", "github.com/gizak/termui", + "github.com/globalsign/mgo", + "github.com/globalsign/mgo/bson", "github.com/go-test/deep", "github.com/golang/gddo/httputil", "github.com/golang/protobuf/jsonpb", @@ -1050,8 +1052,6 @@ "google.golang.org/grpc/grpclog", "google.golang.org/grpc/metadata", "google.golang.org/grpc/status", - "gopkg.in/mgo.v2", - "gopkg.in/mgo.v2/bson", "gopkg.in/olivere/elastic.v5", ] solver-name = "gps-cdcl" diff --git a/Gopkg.toml b/Gopkg.toml index 511a1f3c..57a7388c 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -174,8 +174,8 @@ version = "1.12.0" [[constraint]] - branch = "v2" - name = "gopkg.in/mgo.v2" + version = "r2018.06.15" + name = "github.com/globalsign/mgo" [[constraint]] name = "gopkg.in/olivere/elastic.v5" diff --git a/database/mongodb/counts.go b/database/mongodb/counts.go index bc784e4e..d6ccf6e9 100644 --- a/database/mongodb/counts.go +++ b/database/mongodb/counts.go @@ -3,8 +3,8 @@ package mongodb import ( "context" + "github.com/globalsign/mgo/bson" "github.com/ohsu-comp-bio/funnel/tes" - "gopkg.in/mgo.v2/bson" ) type stateCount struct { @@ -14,7 +14,10 @@ type stateCount struct { // TaskStateCounts returns the number of tasks in each state. func (db *MongoDB) TaskStateCounts(ctx context.Context) (map[string]int32, error) { - pipe := db.tasks.Pipe([]bson.M{ + sess := db.sess.Copy() + defer sess.Close() + + pipe := db.tasks(sess).Pipe([]bson.M{ {"$group": bson.M{"_id": "$state", "count": bson.M{"$sum": 1}}}, }) diff --git a/database/mongodb/events.go b/database/mongodb/events.go index b7df2420..0ddd1f04 100644 --- a/database/mongodb/events.go +++ b/database/mongodb/events.go @@ -5,15 +5,19 @@ import ( "fmt" "time" + "github.com/globalsign/mgo" + "github.com/globalsign/mgo/bson" "github.com/ohsu-comp-bio/funnel/events" "github.com/ohsu-comp-bio/funnel/tes" "github.com/ohsu-comp-bio/funnel/util" - "gopkg.in/mgo.v2" - "gopkg.in/mgo.v2/bson" ) // WriteEvent creates an event for the server to handle. func (db *MongoDB) WriteEvent(ctx context.Context, req *events.Event) error { + sess := db.sess.Copy() + defer sess.Close() + tasks := db.tasks(sess) + update := bson.M{} selector := bson.M{"id": req.Id} @@ -25,7 +29,7 @@ func (db *MongoDB) WriteEvent(ctx context.Context, req *events.Event) error { Logs: []*tes.ExecutorLog{}, }, } - return db.tasks.Insert(&task) + return tasks.Insert(&task) case events.Type_TASK_STATE: retrier := util.NewRetrier() @@ -39,7 +43,7 @@ func (db *MongoDB) WriteEvent(ctx context.Context, req *events.Event) error { return retrier.Retry(ctx, func() error { // get current state & version current := make(map[string]interface{}) - q := db.tasks.Find(bson.M{"id": req.Id}).Select(bson.M{"state": 1, "version": 1}) + q := tasks.Find(bson.M{"id": req.Id}).Select(bson.M{"state": 1, "version": 1}) err := q.One(¤t) if err == mgo.ErrNotFound { return tes.ErrNotFound @@ -58,7 +62,7 @@ func (db *MongoDB) WriteEvent(ctx context.Context, req *events.Event) error { // apply version restriction and set update selector["version"] = current["version"] update = bson.M{"$set": bson.M{"state": to, "version": time.Now().UnixNano()}} - return db.tasks.Update(selector, update) + return tasks.Update(selector, update) }) case events.Type_TASK_START_TIME: @@ -132,5 +136,5 @@ func (db *MongoDB) WriteEvent(ctx context.Context, req *events.Event) error { } } - return db.tasks.Update(selector, update) + return tasks.Update(selector, update) } diff --git a/database/mongodb/new.go b/database/mongodb/new.go index 63711bb4..713f24df 100644 --- a/database/mongodb/new.go +++ b/database/mongodb/new.go @@ -4,46 +4,56 @@ import ( "fmt" "time" + "github.com/globalsign/mgo" "github.com/ohsu-comp-bio/funnel/config" - "gopkg.in/mgo.v2" ) // MongoDB provides an MongoDB database server backend. type MongoDB struct { - sess *mgo.Session - conf config.MongoDB - tasks *mgo.Collection - nodes *mgo.Collection - // events *mgo.Collection + sess *mgo.Session + conf config.MongoDB } // NewMongoDB returns a new MongoDB instance. func NewMongoDB(conf config.MongoDB) (*MongoDB, error) { sess, err := mgo.DialWithInfo(&mgo.DialInfo{ - Addrs: conf.Addrs, - Username: conf.Username, - Password: conf.Password, - Database: conf.Database, - Timeout: time.Duration(conf.Timeout), - // DialServer: func(addr *mgo.ServerAddr) (net.Conn, error) { - // return tls.Dial("tcp", addr.String(), &tls.Config{}) - // }, + Addrs: conf.Addrs, + Username: conf.Username, + Password: conf.Password, + Database: conf.Database, + Timeout: time.Duration(conf.Timeout), + AppName: "funnel", + PoolLimit: 4096, + PoolTimeout: 0, // wait for connection to become available + MinPoolSize: 10, + MaxIdleTimeMS: 120000, // 2 min }) if err != nil { return nil, err } db := &MongoDB{ - sess: sess, - conf: conf, - tasks: sess.DB(conf.Database).C("tasks"), - nodes: sess.DB(conf.Database).C("nodes"), + sess: sess, + conf: conf, } return db, nil } +func (db *MongoDB) tasks(sess *mgo.Session) *mgo.Collection { + return sess.DB(db.conf.Database).C("tasks") +} + +func (db *MongoDB) nodes(sess *mgo.Session) *mgo.Collection { + return sess.DB(db.conf.Database).C("nodes") +} + // Init creates tables in MongoDB. func (db *MongoDB) Init() error { - names, err := db.sess.DB(db.conf.Database).CollectionNames() + sess := db.sess.Copy() + defer sess.Close() + tasks := db.tasks(sess) + nodes := db.nodes(sess) + + names, err := sess.DB(db.conf.Database).CollectionNames() if err != nil { return fmt.Errorf("error listing collection names in database %s: %v", db.conf.Database, err) } @@ -59,12 +69,12 @@ func (db *MongoDB) Init() error { } if !tasksFound { - err = db.tasks.Create(&mgo.CollectionInfo{}) + err = tasks.Create(&mgo.CollectionInfo{}) if err != nil { return fmt.Errorf("error creating tasks collection in database %s: %v", db.conf.Database, err) } - err = db.tasks.EnsureIndex(mgo.Index{ + err = tasks.EnsureIndex(mgo.Index{ Key: []string{"-id", "-creationtime"}, Unique: true, DropDups: true, @@ -77,12 +87,12 @@ func (db *MongoDB) Init() error { } if !nodesFound { - err = db.nodes.Create(&mgo.CollectionInfo{}) + err = nodes.Create(&mgo.CollectionInfo{}) if err != nil { return fmt.Errorf("error creating nodes collection in database %s: %v", db.conf.Database, err) } - err = db.nodes.EnsureIndex(mgo.Index{ + err = nodes.EnsureIndex(mgo.Index{ Key: []string{"id"}, Unique: true, DropDups: true, diff --git a/database/mongodb/scheduler.go b/database/mongodb/scheduler.go index 1ee482c9..d0fcc5be 100644 --- a/database/mongodb/scheduler.go +++ b/database/mongodb/scheduler.go @@ -3,19 +3,22 @@ package mongodb import ( "fmt" + "github.com/globalsign/mgo" + "github.com/globalsign/mgo/bson" "github.com/ohsu-comp-bio/funnel/compute/scheduler" "github.com/ohsu-comp-bio/funnel/tes" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "gopkg.in/mgo.v2" - "gopkg.in/mgo.v2/bson" ) // ReadQueue returns a slice of queued Tasks. Up to "n" tasks are returned. func (db *MongoDB) ReadQueue(n int) []*tes.Task { + sess := db.sess.Copy() + defer sess.Close() + var tasks []*tes.Task - err := db.tasks.Find(bson.M{"state": tes.State_QUEUED}).Sort("creationtime").Select(basicView).Limit(n).All(&tasks) + err := db.tasks(sess).Find(bson.M{"state": tes.State_QUEUED}).Sort("creationtime").Select(basicView).Limit(n).All(&tasks) if err != nil { fmt.Println(err) return nil @@ -27,6 +30,10 @@ func (db *MongoDB) ReadQueue(n int) []*tes.Task { // and status updates, such as completed tasks. The server responds with updated // information for the node, such as canceled tasks. func (db *MongoDB) PutNode(ctx context.Context, node *scheduler.Node) (*scheduler.PutNodeResponse, error) { + sess := db.sess.Copy() + defer sess.Close() + nodes := db.nodes(sess) + q := bson.M{"id": node.Id} if node.GetVersion() != 0 { @@ -34,7 +41,7 @@ func (db *MongoDB) PutNode(ctx context.Context, node *scheduler.Node) (*schedule } var existing scheduler.Node - err := db.nodes.Find(bson.M{"id": node.Id}).One(&existing) + err := nodes.Find(bson.M{"id": node.Id}).One(&existing) if err != nil && err != mgo.ErrNotFound { return nil, err } @@ -46,15 +53,18 @@ func (db *MongoDB) PutNode(ctx context.Context, node *scheduler.Node) (*schedule node.Version = node.GetVersion() + 1 - _, err = db.nodes.Upsert(q, node) + _, err = nodes.Upsert(q, node) return &scheduler.PutNodeResponse{}, err } // GetNode gets a node func (db *MongoDB) GetNode(ctx context.Context, req *scheduler.GetNodeRequest) (*scheduler.Node, error) { + sess := db.sess.Copy() + defer sess.Close() + var node scheduler.Node - err := db.nodes.Find(bson.M{"id": req.Id}).One(&node) + err := db.nodes(sess).Find(bson.M{"id": req.Id}).One(&node) if err == mgo.ErrNotFound { return nil, grpc.Errorf(codes.NotFound, fmt.Sprintf("%v: nodeID: %s", mgo.ErrNotFound.Error(), req.Id)) } @@ -63,7 +73,10 @@ func (db *MongoDB) GetNode(ctx context.Context, req *scheduler.GetNodeRequest) ( // DeleteNode deletes a node func (db *MongoDB) DeleteNode(ctx context.Context, req *scheduler.Node) (*scheduler.DeleteNodeResponse, error) { - err := db.nodes.Remove(bson.M{"id": req.Id}) + sess := db.sess.Copy() + defer sess.Close() + + err := db.nodes(sess).Remove(bson.M{"id": req.Id}) if err == mgo.ErrNotFound { return nil, grpc.Errorf(codes.NotFound, fmt.Sprintf("%v: nodeID: %s", mgo.ErrNotFound.Error(), req.Id)) } @@ -72,8 +85,11 @@ func (db *MongoDB) DeleteNode(ctx context.Context, req *scheduler.Node) (*schedu // ListNodes is an API endpoint that returns a list of nodes. func (db *MongoDB) ListNodes(ctx context.Context, req *scheduler.ListNodesRequest) (*scheduler.ListNodesResponse, error) { + sess := db.sess.Copy() + defer sess.Close() + var nodes []*scheduler.Node - err := db.nodes.Find(nil).All(&nodes) + err := db.nodes(sess).Find(nil).All(&nodes) if err != nil { return nil, err } diff --git a/database/mongodb/tes.go b/database/mongodb/tes.go index e6a5a77c..30968fd8 100644 --- a/database/mongodb/tes.go +++ b/database/mongodb/tes.go @@ -3,10 +3,10 @@ package mongodb import ( "fmt" + "github.com/globalsign/mgo" + "github.com/globalsign/mgo/bson" "github.com/ohsu-comp-bio/funnel/tes" "golang.org/x/net/context" - "gopkg.in/mgo.v2" - "gopkg.in/mgo.v2/bson" ) var basicView = bson.M{ @@ -19,10 +19,13 @@ var minimalView = bson.M{"id": 1, "state": 1} // GetTask gets a task, which describes a running task func (db *MongoDB) GetTask(ctx context.Context, req *tes.GetTaskRequest) (*tes.Task, error) { + sess := db.sess.Copy() + defer sess.Close() + var task tes.Task var q *mgo.Query - q = db.tasks.Find(bson.M{"id": req.Id}) + q = db.tasks(sess).Find(bson.M{"id": req.Id}) switch req.View { case tes.TaskView_BASIC: q = q.Select(basicView) @@ -43,6 +46,9 @@ func (db *MongoDB) GetTask(ctx context.Context, req *tes.GetTaskRequest) (*tes.T // ListTasks returns a list of taskIDs func (db *MongoDB) ListTasks(ctx context.Context, req *tes.ListTasksRequest) (*tes.ListTasksResponse, error) { + sess := db.sess.Copy() + defer sess.Close() + pageSize := tes.GetPageSize(req.GetPageSize()) var query = bson.M{} @@ -60,7 +66,7 @@ func (db *MongoDB) ListTasks(ctx context.Context, req *tes.ListTasksRequest) (*t query[fmt.Sprintf("tags.%s", k)] = bson.M{"$eq": v} } - q = db.tasks.Find(query).Sort("-creationtime").Limit(pageSize) + q = db.tasks(sess).Find(query).Sort("-creationtime").Limit(pageSize) switch req.View { case tes.TaskView_BASIC: diff --git a/logger/grpc.go b/logger/grpc.go index 426259c5..77f0481f 100644 --- a/logger/grpc.go +++ b/logger/grpc.go @@ -24,42 +24,42 @@ type grpclogger struct { } func (g *grpclogger) Info(args ...interface{}) { - g.log.Debug(fmt.Sprint(args)) + g.log.Debug(fmt.Sprint(args...)) } func (g *grpclogger) Infoln(args ...interface{}) { - g.log.Debug(fmt.Sprint(args)) + g.log.Debug(fmt.Sprint(args...)) } func (g *grpclogger) Infof(format string, args ...interface{}) { - g.log.Debug(fmt.Sprintf(format, args)) + g.log.Debug(fmt.Sprintf(format, args...)) } func (g *grpclogger) Warning(args ...interface{}) { - g.log.Debug(fmt.Sprint(args)) + g.log.Debug(fmt.Sprint(args...)) } func (g *grpclogger) Warningln(args ...interface{}) { - g.log.Debug(fmt.Sprint(args)) + g.log.Debug(fmt.Sprint(args...)) } func (g *grpclogger) Warningf(format string, args ...interface{}) { - g.log.Debug(fmt.Sprintf(format, args)) + g.log.Debug(fmt.Sprintf(format, args...)) } func (g *grpclogger) Error(args ...interface{}) { - g.log.Error(fmt.Sprint(args)) + g.log.Error(fmt.Sprint(args...)) } func (g *grpclogger) Errorln(args ...interface{}) { - g.log.Error(fmt.Sprint(args)) + g.log.Error(fmt.Sprint(args...)) } func (g *grpclogger) Errorf(format string, args ...interface{}) { - g.log.Error(fmt.Sprintf(format, args)) + g.log.Error(fmt.Sprintf(format, args...)) } func (g *grpclogger) Fatal(args ...interface{}) { - g.log.Error(fmt.Sprint(args)) + g.log.Error(fmt.Sprint(args...)) os.Exit(1) } func (g *grpclogger) Fatalln(args ...interface{}) { - g.log.Error(fmt.Sprint(args)) + g.log.Error(fmt.Sprint(args...)) os.Exit(1) } func (g *grpclogger) Fatalf(format string, args ...interface{}) { - g.log.Error(fmt.Sprintf(format, args)) + g.log.Error(fmt.Sprintf(format, args...)) os.Exit(1) } func (g *grpclogger) V(l int) bool { diff --git a/server/server.go b/server/server.go index cb964a5e..251412b0 100644 --- a/server/server.go +++ b/server/server.go @@ -6,7 +6,7 @@ import ( "net/http" "github.com/golang/gddo/httputil" - "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" "github.com/grpc-ecosystem/grpc-gateway/runtime" "github.com/ohsu-comp-bio/funnel/compute/scheduler" "github.com/ohsu-comp-bio/funnel/config" diff --git a/server/tes.go b/server/tes.go index 1b95291b..3d45558a 100644 --- a/server/tes.go +++ b/server/tes.go @@ -43,7 +43,12 @@ func (ts *TaskService) CreateTask(ctx context.Context, task *tes.Task) (*tes.Cre } // dispatch to compute backend - go ts.Compute.WriteEvent(ctx, events.NewTaskCreated(task)) + go func() { + err := ts.Compute.WriteEvent(ctx, events.NewTaskCreated(task)) + if err != nil { + ts.Log.Error("error submitting task to compute backend", "taskID", task.Id, "error", err) + } + }() return &tes.CreateTaskResponse{Id: task.Id}, nil } diff --git a/storage/http_test.go b/storage/http_test.go index ad104e95..fba42461 100644 --- a/storage/http_test.go +++ b/storage/http_test.go @@ -17,7 +17,7 @@ func TestHTTP(t *testing.T) { } // test client timeout - err = store.UnsupportedOperations("https://fakeurl.com").Get + err = store.UnsupportedOperations("https://fakeurl1234.com").Get if err == nil { t.Error("Expected timeout error") } @@ -47,7 +47,7 @@ func TestHTTP(t *testing.T) { } // test Put not supported - _, err = store.Put(context.Background(), "https://fakeurl.com", "_test_download/test_https_download.html") + _, err = store.Put(context.Background(), "https://fakeurl1234.com", "_test_download/test_https_download.html") if err == nil { t.Error("Expected error for Put call") } diff --git a/util/rpc/rpc.go b/util/rpc/rpc.go index 6498b396..67c84096 100644 --- a/util/rpc/rpc.go +++ b/util/rpc/rpc.go @@ -6,7 +6,7 @@ import ( "math/rand" "time" - "github.com/grpc-ecosystem/go-grpc-middleware/retry" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" "github.com/ohsu-comp-bio/funnel/config" "golang.org/x/net/context" "google.golang.org/grpc" diff --git a/webdash/http.go b/webdash/http.go index ea7757ba..6e47e2fa 100644 --- a/webdash/http.go +++ b/webdash/http.go @@ -3,7 +3,7 @@ package webdash import ( "net/http" - "github.com/elazarl/go-bindata-assetfs" + assetfs "github.com/elazarl/go-bindata-assetfs" ) var fs = &assetfs.AssetFS{