Skip to content

Commit

Permalink
Update TESv1.1 support
Browse files Browse the repository at this point in the history
  • Loading branch information
kellrott authored and lbeckman314 committed Nov 15, 2023
1 parent 0740633 commit bbfc616
Show file tree
Hide file tree
Showing 37 changed files with 5,429 additions and 3,039 deletions.
11 changes: 4 additions & 7 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
name: Go

on:
- pull_request
- push
on: [ pull_request ]

jobs:

Expand All @@ -17,8 +15,8 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.50.1

version: latest
args: --timeout 3m --verbose -D unused -D errcheck -D staticcheck -D govet -D gosimple -D ineffassign
build:
runs-on: ubuntu-latest
steps:
Expand Down Expand Up @@ -48,9 +46,8 @@ jobs:
go-version: ^1.18
- name: Check out code
uses: actions/checkout@v2

- name: Unit Tests
run: make test-verbose
run: make test

mongoTest:
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ lint-depends:

# Run code style and other checks
lint:
@golangci-lint run --timeout 3m --disable-all --enable=golint --enable=gofmt --enable=goimports --enable=misspell \
@golangci-lint run --timeout 3m --disable-all --enable=vet --enable=golint --enable=gofmt --enable=goimports --enable=misspell \
--skip-dirs "vendor" \
--skip-dirs "webdash" \
--skip-dirs "cmd/webdash" \
Expand Down
15 changes: 8 additions & 7 deletions cmd/task/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
// List runs the "task list" CLI command, which connects to the server,
// calls ListTasks() and requests the given task view.
// Output is written to the given writer.
func List(server, taskView, pageToken, stateFilter string, tagsFilter []string, pageSize int32, all bool, writer io.Writer) error {
func List(server, taskView, pageToken, stateFilter string, tagsFilter []string, namePrefix string, pageSize int32, all bool, writer io.Writer) error {
cli, err := tes.NewClient(server)
if err != nil {
return err
Expand Down Expand Up @@ -43,12 +43,13 @@ func List(server, taskView, pageToken, stateFilter string, tagsFilter []string,

for {
req := &tes.ListTasksRequest{
View: taskView,
PageToken: pageToken,
PageSize: pageSize,
State: state,
TagKey: tagKeys,
TagValue: tagVals,
View: taskView,
PageToken: pageToken,
PageSize: pageSize,
State: state,
TagKey: tagKeys,
TagValue: tagVals,
NamePrefix: namePrefix,
}

resp, err := cli.ListTasks(context.Background(), req)
Expand Down
3 changes: 2 additions & 1 deletion cmd/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func newCommandHooks() (*cobra.Command, *hooks) {
lf.StringVarP(&pageToken, "page-token", "p", pageToken, "Page token")
lf.StringVar(&stateFilter, "state", stateFilter, "State filter")
lf.StringSliceVar(&tagsFilter, "tag", tagsFilter, "Tag filter. May be used multiple times to specify more than one tag")
lf.StringVar(&namePrefix, "name-prefix", namePrefix, "Name prefix")
lf.Int32VarP(&pageSize, "page-size", "s", pageSize, "Page size")
lf.BoolVar(&listAll, "all", listAll, "List all tasks")

Expand Down Expand Up @@ -123,7 +124,7 @@ func newCommandHooks() (*cobra.Command, *hooks) {
type hooks struct {
Create func(server string, messages []string, r io.Reader, w io.Writer) error
Get func(server string, ids []string, view string, w io.Writer) error
List func(server, view, pageToken, stateFilter string, tagsFilter []string, pageSize int32, all bool, w io.Writer) error
List func(server, view, pageToken, stateFilter string, tagsFilter []string, namePrefix string, pageSize int32, all bool, w io.Writer) error
Cancel func(server string, ids []string, w io.Writer) error
Wait func(server string, ids []string) error
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/task/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestGetDefaultView(t *testing.T) {
func TestList(t *testing.T) {
cmd, h := newCommandHooks()

h.List = func(server, view, page, state string, tags []string, size int32, all bool, w io.Writer) error {
h.List = func(server, view, page, state string, tags []string, namePrefix string, size int32, all bool, w io.Writer) error {
if view != "FULL" {
t.Errorf("expected FULL view, got '%s'", view)
}
Expand All @@ -62,7 +62,7 @@ func TestServerDefault(t *testing.T) {
}
return nil
}
h.List = func(server, view, page, state string, tags []string, size int32, all bool, w io.Writer) error {
h.List = func(server, view, page, state string, tags []string, namePrefix string, size int32, all bool, w io.Writer) error {
if server != "http://localhost:8000" {
t.Errorf("expected localhost default, got '%s'", server)
}
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestServerEnv(t *testing.T) {
}
return nil
}
h.List = func(server, view, page, state string, tags []string, size int32, all bool, w io.Writer) error {
h.List = func(server, view, page, state string, tags []string, namePrefix string, size int32, all bool, w io.Writer) error {
if server != "foobar" {
t.Error("expected foobar")
}
Expand Down Expand Up @@ -170,7 +170,7 @@ func TestServerFlagOverride(t *testing.T) {
}
return nil
}
h.List = func(server, view, page, state string, tags []string, size int32, all bool, w io.Writer) error {
h.List = func(server, view, page, state string, tags []string, namePrefix string, size int32, all bool, w io.Writer) error {
if server != "flagval" {
t.Error("expected flagval")
}
Expand Down
4 changes: 2 additions & 2 deletions compute/batch/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func (b *Backend) Submit(task *tes.Task) error {
if ram > 0 {
req.ContainerOverrides.Memory = aws.Int64(ram)
req.ContainerOverrides.ResourceRequirements = []*batch.ResourceRequirement{
{
Type: aws.String("MEMORY"),
&batch.ResourceRequirement {
Type: aws.String("MEMORY"),
Value: aws.String(strconv.FormatInt(ram, 10)),
},
}
Expand Down
6 changes: 3 additions & 3 deletions compute/hpc_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ type HPCBackend struct {
// are mapped to TES states along with an optional reason for this mapping.
// The Reconcile function can then use the response to update the task states
// and system logs to report errors reported by the backend.
MapStates func([]string) ([]*HPCTaskState, error)
ReconcileRate time.Duration
Log *logger.Logger
MapStates func([]string) ([]*HPCTaskState, error)
ReconcileRate time.Duration
Log *logger.Logger
backendParameters map[string]string
events.Computer
}
Expand Down
2 changes: 1 addition & 1 deletion compute/local/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (b *Backend) Submit(task *tes.Task) error {
return err
}

go func() {
go func() {
w.Run(ctx)
w.Close()
}()
Expand Down
2 changes: 1 addition & 1 deletion compute/scheduler/scheduler.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 17 additions & 10 deletions compute/scheduler/scheduler_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions database/boltdb/new.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ var ExecutorStderr = []byte("executor-stderr")
var Nodes = []byte("nodes")

// SysLogs defeines the name of a bucket with maps
//
// task ID -> tes.TaskLog.SystemLogs
// task ID -> tes.TaskLog.SystemLogs
var SysLogs = []byte("system-logs")

// BoltDB provides handlers for gRPC endpoints.
Expand Down
4 changes: 4 additions & 0 deletions database/boltdb/tes.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ func (taskBolt *BoltDB) ListTasks(ctx context.Context, req *tes.ListTasksRequest
continue taskLoop
}

if !strings.HasPrefix(task.Name, req.NamePrefix) {
continue taskLoop
}

for k, v := range req.GetTags() {
tval, ok := task.Tags[k]
if !ok || (tval != v && v != "") {
Expand Down
2 changes: 1 addition & 1 deletion database/datastore/tes.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (d *Datastore) ListTasks(ctx context.Context, req *tes.ListTasksRequest) (*
}

for k, v := range req.GetTags() {
q = q.Filter("TagStrings =", encodeKV(k, v))
q = q.FilterField("TagStrings", "=", encodeKV(k, v))
}

var tasks []*tes.Task
Expand Down
10 changes: 5 additions & 5 deletions database/mongodb/counts.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ type stateCount struct {
// TaskStateCounts returns the number of tasks in each state.
func (db *MongoDB) TaskStateCounts(ctx context.Context) (map[string]int32, error) {
stateStage := bson.D{{
Key: "$sort", Value: bson.D{{Key: "state", Value: 1}},
"$sort", bson.D{{"state", 1}},
}}

groupStage := bson.D{
{Key: "$group", Value: bson.D{
{Key: "_id", Value: "$state"},
{Key: "count", Value: bson.D{{Key: "$sum", Value: 1}}},
{"$group", bson.D{
{"_id", "$state"},
{"count", bson.D{{"$sum", 1}}},
},
}}
}}

cursor, err := db.tasks(db.client).Aggregate(context.TODO(), mongo.Pipeline{stateStage, groupStage})
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions database/mongodb/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func (db *MongoDB) WriteEvent(ctx context.Context, req *events.Event) error {
task := req.GetTask()
task.Logs = []*tes.TaskLog{
{
Logs: []*tes.ExecutorLog{},
Metadata: map[string]string{},
Logs: []*tes.ExecutorLog{},
Metadata: map[string]string{},
SystemLogs: []string{},
},
}
Expand Down Expand Up @@ -57,7 +57,7 @@ func (db *MongoDB) WriteEvent(ctx context.Context, req *events.Event) error {
// apply version restriction and set update
selector["version"] = current["version"]
update = bson.M{"$set": bson.M{"state": to, "version": time.Now().UnixNano()}}

result, err := tasks.UpdateOne(context.TODO(), selector, update)
if result.MatchedCount == 0 {
return tes.ErrConcurrentStateChange
Expand Down
19 changes: 9 additions & 10 deletions database/mongodb/new.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"

"github.com/globalsign/mgo"
"github.com/ohsu-comp-bio/funnel/compute/scheduler"
"github.com/ohsu-comp-bio/funnel/config"
"go.mongodb.org/mongo-driver/bson"
Expand All @@ -15,14 +14,14 @@ import (
// MongoDB provides an MongoDB database server backend.
type MongoDB struct {
scheduler.UnimplementedSchedulerServiceServer
sess *mgo.Session
client *mongo.Client
conf config.MongoDB
active bool
}

func NewMongoDB(conf config.MongoDB) (*MongoDB, error) {
client, err := mongo.Connect(
context.TODO(),
context.TODO(),
options.Client().ApplyURI(conf.Addrs[0]))

if err != nil {
Expand Down Expand Up @@ -61,7 +60,7 @@ func (db *MongoDB) Init() error {
case "tasks":
tasksFound = true
case "nodes":
nodesFound = true
nodesFound = true
}
}

Expand All @@ -72,11 +71,11 @@ func (db *MongoDB) Init() error {
}

indexModel := mongo.IndexModel{
Keys: bson.D{
{Key: "-id", Value: -1},
{Key: "-creationtime", Value: -1},
},
Options: options.Index().SetUnique(true).SetSparse(true),
Keys: bson.D{
{"-id", -1},
{"-creationtime", -1},
},
Options: options.Index().SetUnique(true).SetSparse(true),
}
_, err = tasks.Indexes().CreateOne(context.TODO(), indexModel)
if err != nil {
Expand All @@ -92,7 +91,7 @@ func (db *MongoDB) Init() error {

indexModel := mongo.IndexModel{
Keys: bson.D{
{Key: "-id", Value: -1},
{"-id", -1},
},
Options: options.Index().SetUnique(true).SetSparse(true),
}
Expand Down
Loading

0 comments on commit bbfc616

Please sign in to comment.