From f431a5ac86f14c27a1693c2b6a7fb5edbeef9a96 Mon Sep 17 00:00:00 2001 From: Alex Buchanan Date: Thu, 28 Jun 2018 17:20:47 -0700 Subject: [PATCH 1/7] cmd/worker: run task from file --- cmd/node/run.go | 14 +++-- cmd/worker/run.go | 111 ++++++++++++++++++++------------------ cmd/worker/worker.go | 18 ++++--- compute/local/backend.go | 26 ++++----- worker/file_taskreader.go | 63 ++++++++++++++++++++++ worker/interfaces.go | 4 +- worker/rpc_taskreader.go | 13 ++--- worker/taskreader.go | 15 +++--- worker/worker.go | 26 ++++++--- 9 files changed, 194 insertions(+), 96 deletions(-) create mode 100644 worker/file_taskreader.go diff --git a/cmd/node/run.go b/cmd/node/run.go index 1ae87e77a..ac099cfdb 100644 --- a/cmd/node/run.go +++ b/cmd/node/run.go @@ -18,12 +18,18 @@ import ( func Run(ctx context.Context, conf config.Config, log *logger.Logger) error { conf.Node.ID = scheduler.GenNodeID() - w, err := workerCmd.NewWorker(ctx, conf, log) - if err != nil { - return err + factory := func(ctx context.Context, taskID string) error { + w, err := workerCmd.NewWorker(ctx, conf, log, &workerCmd.WorkerOpts{ + TaskID: taskID, + }) + if err != nil { + return err + } + w.Run(ctx) + return nil } - n, err := scheduler.NewNodeProcess(ctx, conf, w.Run, log) + n, err := scheduler.NewNodeProcess(ctx, conf, factory, log) if err != nil { return err } diff --git a/cmd/worker/run.go b/cmd/worker/run.go index ae8ac9512..878aacf4d 100644 --- a/cmd/worker/run.go +++ b/cmd/worker/run.go @@ -18,16 +18,16 @@ import ( ) // Run runs the "worker run" command. -func Run(ctx context.Context, conf config.Config, log *logger.Logger, taskID string) error { - w, err := NewWorker(ctx, conf, log) +func Run(ctx context.Context, conf config.Config, log *logger.Logger, opts *WorkerOpts) error { + w, err := NewWorker(ctx, conf, log, opts) if err != nil { return err } - return w.Run(ctx, taskID) + return w.Run(ctx) } // NewWorker returns a new Funnel worker based on the given config. -func NewWorker(ctx context.Context, conf config.Config, log *logger.Logger) (*worker.DefaultWorker, error) { +func NewWorker(ctx context.Context, conf config.Config, log *logger.Logger, opts *WorkerOpts) (*worker.DefaultWorker, error) { log.Debug("NewWorker", "config", conf) var err error @@ -43,63 +43,72 @@ func NewWorker(ctx context.Context, conf config.Config, log *logger.Logger) (*wo eventWriterSet[strings.ToLower(w)] = nil } - for e := range eventWriterSet { - switch e { - case "log": - writer = &events.Logger{Log: log} - case "boltdb": - writer, err = events.NewRPCWriter(ctx, conf.RPCClient) - case "badger": - writer, err = events.NewRPCWriter(ctx, conf.RPCClient) - case "dynamodb": - writer, err = dynamodb.NewDynamoDB(conf.DynamoDB) + // TODO bad hack + if opts.TaskFile != "" { + writer = &events.Logger{Log: log} + } else { + for e := range eventWriterSet { + switch e { + case "log": + writer = &events.Logger{Log: log} + case "boltdb": + writer, err = events.NewRPCWriter(ctx, conf.RPCClient) + case "badger": + writer, err = events.NewRPCWriter(ctx, conf.RPCClient) + case "dynamodb": + writer, err = dynamodb.NewDynamoDB(conf.DynamoDB) + case "datastore": + writer, err = datastore.NewDatastore(conf.Datastore) + case "elastic": + writer, err = elastic.NewElastic(conf.Elastic) + case "kafka": + writer, err = events.NewKafkaWriter(ctx, conf.Kafka) + case "pubsub": + writer, err = events.NewPubSubWriter(ctx, conf.PubSub) + case "mongodb": + writer, err = mongodb.NewMongoDB(conf.MongoDB) + default: + err = fmt.Errorf("unknown event writer: %s", e) + } + if err != nil { + return nil, fmt.Errorf("error occurred while initializing the %s event writer: %v", e, err) + } + if writer != nil { + writers = append(writers, writer) + } + } + } + + writer = &events.SystemLogFilter{Writer: &writers, Level: conf.Logger.Level} + writer = &events.ErrLogger{Writer: writer, Log: log} + + if opts.TaskFile != "" { + reader = &worker.FileTaskReader{Path: opts.TaskFile} + } else { + switch strings.ToLower(conf.Database) { case "datastore": - writer, err = datastore.NewDatastore(conf.Datastore) + db, err = datastore.NewDatastore(conf.Datastore) + case "dynamodb": + db, err = dynamodb.NewDynamoDB(conf.DynamoDB) case "elastic": - writer, err = elastic.NewElastic(conf.Elastic) - case "kafka": - writer, err = events.NewKafkaWriter(ctx, conf.Kafka) - case "pubsub": - writer, err = events.NewPubSubWriter(ctx, conf.PubSub) + db, err = elastic.NewElastic(conf.Elastic) case "mongodb": - writer, err = mongodb.NewMongoDB(conf.MongoDB) + db, err = mongodb.NewMongoDB(conf.MongoDB) + case "boltdb": + reader, err = worker.NewRPCTaskReader(ctx, conf.RPCClient, opts.TaskID) + case "badger": + reader, err = worker.NewRPCTaskReader(ctx, conf.RPCClient, opts.TaskID) default: - err = fmt.Errorf("unknown event writer: %s", e) + err = fmt.Errorf("unknown database: '%s'", conf.Database) } if err != nil { - return nil, fmt.Errorf("error occurred while initializing the %s event writer: %v", e, err) + return nil, fmt.Errorf("failed to instantiate database client: %v", err) } - if writer != nil { - writers = append(writers, writer) + if reader == nil { + reader = worker.NewGenericTaskReader(db.GetTask, opts.TaskID) } } - writer = &events.SystemLogFilter{Writer: &writers, Level: conf.Logger.Level} - writer = &events.ErrLogger{Writer: writer, Log: log} - - switch strings.ToLower(conf.Database) { - case "datastore": - db, err = datastore.NewDatastore(conf.Datastore) - case "dynamodb": - db, err = dynamodb.NewDynamoDB(conf.DynamoDB) - case "elastic": - db, err = elastic.NewElastic(conf.Elastic) - case "mongodb": - db, err = mongodb.NewMongoDB(conf.MongoDB) - case "boltdb": - reader, err = worker.NewRPCTaskReader(ctx, conf.RPCClient) - case "badger": - reader, err = worker.NewRPCTaskReader(ctx, conf.RPCClient) - default: - err = fmt.Errorf("unknown database: '%s'", conf.Database) - } - if err != nil { - return nil, fmt.Errorf("failed to instantiate database client: %v", err) - } - if reader == nil { - reader = worker.NewGenericTaskReader(db.GetTask) - } - store, err := storage.NewMux(conf) if err != nil { return nil, fmt.Errorf("failed to instantiate Storage backend: %v", err) diff --git a/cmd/worker/worker.go b/cmd/worker/worker.go index 545469e82..111a3621c 100644 --- a/cmd/worker/worker.go +++ b/cmd/worker/worker.go @@ -13,6 +13,11 @@ import ( "github.com/spf13/cobra" ) +type WorkerOpts struct { + TaskID string + TaskFile string +} + // NewCommand returns the worker command func NewCommand() *cobra.Command { cmd, _ := newCommandHooks() @@ -20,7 +25,7 @@ func NewCommand() *cobra.Command { } type hooks struct { - Run func(ctx context.Context, conf config.Config, log *logger.Logger, taskID string) error + Run func(ctx context.Context, conf config.Config, log *logger.Logger, opts *WorkerOpts) error } func newCommandHooks() (*cobra.Command, *hooks) { @@ -32,8 +37,8 @@ func newCommandHooks() (*cobra.Command, *hooks) { configFile string conf config.Config flagConf config.Config - taskID string ) + opts := &WorkerOpts{} cmd := &cobra.Command{ Use: "worker", @@ -59,8 +64,8 @@ func newCommandHooks() (*cobra.Command, *hooks) { Short: "Run a task directly, bypassing the server.", Args: cobra.NoArgs, RunE: func(cmd *cobra.Command, args []string) error { - if taskID == "" { - return fmt.Errorf("no taskID was provided") + if opts.TaskID == "" && opts.TaskFile == "" { + return fmt.Errorf("no task ID nor task file was provided") } log := logger.NewLogger("worker", conf.Logger) @@ -68,12 +73,13 @@ func newCommandHooks() (*cobra.Command, *hooks) { ctx, cancel := context.WithCancel(context.Background()) ctx = util.SignalContext(ctx, time.Millisecond*500, syscall.SIGINT, syscall.SIGTERM) defer cancel() - return hooks.Run(ctx, conf, log, taskID) + return hooks.Run(ctx, conf, log, opts) }, } f = run.Flags() - f.StringVarP(&taskID, "taskID", "t", taskID, "Task ID") + f.StringVarP(&opts.TaskID, "taskID", "t", opts.TaskID, "Task ID") + f.StringVarP(&opts.TaskFile, "taskFile", "f", opts.TaskFile, "Task file") cmd.AddCommand(run) return cmd, hooks diff --git a/compute/local/backend.go b/compute/local/backend.go index 4f8be2e07..606133e7f 100644 --- a/compute/local/backend.go +++ b/compute/local/backend.go @@ -12,23 +12,17 @@ import ( "github.com/ohsu-comp-bio/funnel/logger" "github.com/ohsu-comp-bio/funnel/tes" "github.com/ohsu-comp-bio/funnel/util" - "github.com/ohsu-comp-bio/funnel/worker" ) // NewBackend returns a new local Backend instance. func NewBackend(ctx context.Context, conf config.Config, log *logger.Logger) (*Backend, error) { - w, err := workerCmd.NewWorker(ctx, conf, log) - if err != nil { - return nil, err - } - return &Backend{conf, w, log}, nil + return &Backend{conf, log}, nil } // Backend represents the local backend. type Backend struct { - conf config.Config - worker *worker.DefaultWorker - log *logger.Logger + conf config.Config + log *logger.Logger } // WriteEvent writes an event to the compute backend. @@ -44,11 +38,19 @@ func (b *Backend) WriteEvent(ctx context.Context, ev *events.Event) error { // Submit submits a task. For the Local backend this results in the task // running immediately. func (b *Backend) Submit(task *tes.Task) error { + ctx, cancel := context.WithCancel(context.Background()) + ctx = util.SignalContext(ctx, time.Millisecond, syscall.SIGINT, syscall.SIGTERM) + + w, err := workerCmd.NewWorker(ctx, b.conf, b.log, &workerCmd.WorkerOpts{ + TaskID: task.Id, + }) + if err != nil { + return err + } + go func() { - ctx, cancel := context.WithCancel(context.Background()) - ctx = util.SignalContext(ctx, time.Millisecond, syscall.SIGINT, syscall.SIGTERM) defer cancel() - b.worker.Run(ctx, task.Id) + w.Run(ctx) }() return nil } diff --git a/worker/file_taskreader.go b/worker/file_taskreader.go new file mode 100644 index 000000000..ba43eff11 --- /dev/null +++ b/worker/file_taskreader.go @@ -0,0 +1,63 @@ +package worker + +import ( + "context" + "fmt" + "os" + + "github.com/golang/protobuf/jsonpb" + "github.com/ohsu-comp-bio/funnel/tes" +) + +type FileTaskReader struct { + Path string + task *tes.Task +} + +func (f *FileTaskReader) Task(ctx context.Context) (*tes.Task, error) { + if f.task != nil { + return f.task, nil + } + + err := f.load() + if err != nil { + return nil, err + } + + fmt.Printf("%#v\n", f.task) + return f.task, nil +} + +func (f *FileTaskReader) State(ctx context.Context) (tes.State, error) { + if f.task != nil { + return f.task.State, nil + } + + err := f.load() + if err != nil { + return tes.Unknown, err + } + + return f.task.State, nil +} + +func (f *FileTaskReader) load() error { + fh, err := os.Open(f.Path) + if err != nil { + return fmt.Errorf("opening task file: %v", err) + } + defer fh.Close() + + task := &tes.Task{} + err = jsonpb.Unmarshal(fh, task) + if err != nil { + return fmt.Errorf("unmarshaling task file: %v", err) + } + err = tes.InitTask(task) + if err != nil { + return fmt.Errorf("validating task: %v", err) + } + + f.task = task + return nil +} diff --git a/worker/interfaces.go b/worker/interfaces.go index 6deafa5fa..35df7d097 100644 --- a/worker/interfaces.go +++ b/worker/interfaces.go @@ -8,6 +8,6 @@ import ( // TaskReader is a type which reads task information // during task execution. type TaskReader interface { - Task(ctx context.Context, taskID string) (*tes.Task, error) - State(ctx context.Context, taskID string) (tes.State, error) + Task(ctx context.Context) (*tes.Task, error) + State(ctx context.Context) (tes.State, error) } diff --git a/worker/rpc_taskreader.go b/worker/rpc_taskreader.go index 2921d8cf2..9e98a2d1a 100644 --- a/worker/rpc_taskreader.go +++ b/worker/rpc_taskreader.go @@ -12,30 +12,31 @@ import ( type RPCTaskReader struct { client tes.TaskServiceClient conn *grpc.ClientConn + taskID string } // NewRPCTaskReader returns a new RPC-based task reader. -func NewRPCTaskReader(ctx context.Context, conf config.RPCClient) (*RPCTaskReader, error) { +func NewRPCTaskReader(ctx context.Context, conf config.RPCClient, taskID string) (*RPCTaskReader, error) { conn, err := util.Dial(ctx, conf) if err != nil { return nil, err } cli := tes.NewTaskServiceClient(conn) - return &RPCTaskReader{cli, conn}, nil + return &RPCTaskReader{cli, conn, taskID}, nil } // Task returns the task descriptor. -func (r *RPCTaskReader) Task(ctx context.Context, taskID string) (*tes.Task, error) { +func (r *RPCTaskReader) Task(ctx context.Context) (*tes.Task, error) { return r.client.GetTask(ctx, &tes.GetTaskRequest{ - Id: taskID, + Id: r.taskID, View: tes.TaskView_FULL, }) } // State returns the current state of the task. -func (r *RPCTaskReader) State(ctx context.Context, taskID string) (tes.State, error) { +func (r *RPCTaskReader) State(ctx context.Context) (tes.State, error) { t, err := r.client.GetTask(ctx, &tes.GetTaskRequest{ - Id: taskID, + Id: r.taskID, View: tes.TaskView_MINIMAL, }) return t.GetState(), err diff --git a/worker/taskreader.go b/worker/taskreader.go index 8b90e12ce..d41edad96 100644 --- a/worker/taskreader.go +++ b/worker/taskreader.go @@ -7,26 +7,27 @@ import ( // GenericTaskReader provides read access to tasks. type GenericTaskReader struct { - get func(ctx context.Context, in *tes.GetTaskRequest) (*tes.Task, error) + get func(ctx context.Context, in *tes.GetTaskRequest) (*tes.Task, error) + taskID string } // NewGenericTaskReader returns a new generic task reader. -func NewGenericTaskReader(get func(ctx context.Context, in *tes.GetTaskRequest) (*tes.Task, error)) *GenericTaskReader { - return &GenericTaskReader{get} +func NewGenericTaskReader(get func(ctx context.Context, in *tes.GetTaskRequest) (*tes.Task, error), taskID string) *GenericTaskReader { + return &GenericTaskReader{get, taskID} } // Task returns the task descriptor. -func (r *GenericTaskReader) Task(ctx context.Context, taskID string) (*tes.Task, error) { +func (r *GenericTaskReader) Task(ctx context.Context) (*tes.Task, error) { return r.get(ctx, &tes.GetTaskRequest{ - Id: taskID, + Id: r.taskID, View: tes.TaskView_FULL, }) } // State returns the current state of the task. -func (r *GenericTaskReader) State(ctx context.Context, taskID string) (tes.State, error) { +func (r *GenericTaskReader) State(ctx context.Context) (tes.State, error) { t, err := r.get(ctx, &tes.GetTaskRequest{ - Id: taskID, + Id: r.taskID, View: tes.TaskView_MINIMAL, }) return t.GetState(), err diff --git a/worker/worker.go b/worker/worker.go index 9cf615a81..ec3913605 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -26,7 +26,7 @@ type DefaultWorker struct { } // Run runs the Worker. -func (r *DefaultWorker) Run(pctx context.Context, taskID string) (runerr error) { +func (r *DefaultWorker) Run(pctx context.Context) (runerr error) { // The code here is verbose, but simple; mainly loops and simple error checking. // @@ -44,9 +44,21 @@ func (r *DefaultWorker) Run(pctx context.Context, taskID string) (runerr error) var run helper var task *tes.Task + task, run.syserr = r.TaskReader.Task(pctx) + // TODO if we failed to retrieve the task, we can't do anything useful. + // but, it's also difficult to report the failure usefully. + if run.syserr != nil { + r.EventWriter.WriteEvent(pctx, events.NewSystemLog("unknown", 0, 0, "error", + "failed to get task. ID unknown", map[string]string{ + "error": run.syserr.Error(), + })) + runerr = run.syserr + return + } + // set up task specific utilities - event = events.NewTaskWriter(taskID, 0, r.EventWriter) - mapper = NewFileMapper(filepath.Join(r.Conf.WorkDir, taskID)) + event = events.NewTaskWriter(task.GetId(), 0, r.EventWriter) + mapper = NewFileMapper(filepath.Join(r.Conf.WorkDir, task.GetId())) event.Info("Version", version.LogFields()...) event.State(tes.State_INITIALIZING) @@ -56,8 +68,6 @@ func (r *DefaultWorker) Run(pctx context.Context, taskID string) (runerr error) event.Metadata(map[string]string{"hostname": name}) } - task, run.syserr = r.TaskReader.Task(pctx, taskID) - // Run the final logging/state steps in a deferred function // to ensure they always run, even if there's a missed error. defer func() { @@ -95,7 +105,7 @@ func (r *DefaultWorker) Run(pctx context.Context, taskID string) (runerr error) run.syserr = e }) - ctx := r.pollForCancel(pctx, taskID, func() { run.taskCanceled = true }) + ctx := r.pollForCancel(pctx, func() { run.taskCanceled = true }) run.ctx = ctx // Prepare file mapper, which maps task file URLs to host filesystem paths @@ -223,7 +233,7 @@ func (r *DefaultWorker) validate(mapper *FileMapper) error { return nil } -func (r *DefaultWorker) pollForCancel(pctx context.Context, taskID string, cancelCallback func()) context.Context { +func (r *DefaultWorker) pollForCancel(pctx context.Context, cancelCallback func()) context.Context { taskctx, cancel := context.WithCancel(pctx) // Start a goroutine that polls the server to watch for a canceled state. @@ -237,7 +247,7 @@ func (r *DefaultWorker) pollForCancel(pctx context.Context, taskID string, cance case <-taskctx.Done(): return case <-ticker.C: - state, _ := r.TaskReader.State(taskctx, taskID) + state, _ := r.TaskReader.State(taskctx) if tes.TerminalState(state) { cancel() cancelCallback() From 9bb8c233f5ceb237dc8ebd8721370b46f2c0be23 Mon Sep 17 00:00:00 2001 From: Alex Buchanan Date: Thu, 28 Jun 2018 17:30:12 -0700 Subject: [PATCH 2/7] cmd/worker: cleanup file-based task logging --- cmd/worker/run.go | 6 +++--- worker/file_taskreader.go | 1 - 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/cmd/worker/run.go b/cmd/worker/run.go index 878aacf4d..e7d08d48a 100644 --- a/cmd/worker/run.go +++ b/cmd/worker/run.go @@ -73,11 +73,11 @@ func NewWorker(ctx context.Context, conf config.Config, log *logger.Logger, opts if err != nil { return nil, fmt.Errorf("error occurred while initializing the %s event writer: %v", e, err) } - if writer != nil { - writers = append(writers, writer) - } } } + if writer != nil { + writers = append(writers, writer) + } writer = &events.SystemLogFilter{Writer: &writers, Level: conf.Logger.Level} writer = &events.ErrLogger{Writer: writer, Log: log} diff --git a/worker/file_taskreader.go b/worker/file_taskreader.go index ba43eff11..c88a2b394 100644 --- a/worker/file_taskreader.go +++ b/worker/file_taskreader.go @@ -24,7 +24,6 @@ func (f *FileTaskReader) Task(ctx context.Context) (*tes.Task, error) { return nil, err } - fmt.Printf("%#v\n", f.task) return f.task, nil } From 260d4ac76a5ef5e068a0c2f0bbaedc7811f1404f Mon Sep 17 00:00:00 2001 From: Alex Buchanan Date: Fri, 29 Jun 2018 10:14:37 -0700 Subject: [PATCH 3/7] cmd/worker: tidy and lint --- cmd/node/run.go | 2 +- cmd/worker/run.go | 4 ++-- cmd/worker/worker.go | 7 ++++--- compute/local/backend.go | 9 ++------- worker/file_taskreader.go | 7 +++++++ 5 files changed, 16 insertions(+), 13 deletions(-) diff --git a/cmd/node/run.go b/cmd/node/run.go index ac099cfdb..c0492273a 100644 --- a/cmd/node/run.go +++ b/cmd/node/run.go @@ -19,7 +19,7 @@ func Run(ctx context.Context, conf config.Config, log *logger.Logger) error { conf.Node.ID = scheduler.GenNodeID() factory := func(ctx context.Context, taskID string) error { - w, err := workerCmd.NewWorker(ctx, conf, log, &workerCmd.WorkerOpts{ + w, err := workerCmd.NewWorker(ctx, conf, log, &workerCmd.Options{ TaskID: taskID, }) if err != nil { diff --git a/cmd/worker/run.go b/cmd/worker/run.go index e7d08d48a..f9bdf9914 100644 --- a/cmd/worker/run.go +++ b/cmd/worker/run.go @@ -18,7 +18,7 @@ import ( ) // Run runs the "worker run" command. -func Run(ctx context.Context, conf config.Config, log *logger.Logger, opts *WorkerOpts) error { +func Run(ctx context.Context, conf config.Config, log *logger.Logger, opts *Options) error { w, err := NewWorker(ctx, conf, log, opts) if err != nil { return err @@ -27,7 +27,7 @@ func Run(ctx context.Context, conf config.Config, log *logger.Logger, opts *Work } // NewWorker returns a new Funnel worker based on the given config. -func NewWorker(ctx context.Context, conf config.Config, log *logger.Logger, opts *WorkerOpts) (*worker.DefaultWorker, error) { +func NewWorker(ctx context.Context, conf config.Config, log *logger.Logger, opts *Options) (*worker.DefaultWorker, error) { log.Debug("NewWorker", "config", conf) var err error diff --git a/cmd/worker/worker.go b/cmd/worker/worker.go index 111a3621c..344f547cd 100644 --- a/cmd/worker/worker.go +++ b/cmd/worker/worker.go @@ -13,7 +13,8 @@ import ( "github.com/spf13/cobra" ) -type WorkerOpts struct { +// Options holds a few CLI options for worker entrypoints. +type Options struct { TaskID string TaskFile string } @@ -25,7 +26,7 @@ func NewCommand() *cobra.Command { } type hooks struct { - Run func(ctx context.Context, conf config.Config, log *logger.Logger, opts *WorkerOpts) error + Run func(ctx context.Context, conf config.Config, log *logger.Logger, opts *Options) error } func newCommandHooks() (*cobra.Command, *hooks) { @@ -38,7 +39,7 @@ func newCommandHooks() (*cobra.Command, *hooks) { conf config.Config flagConf config.Config ) - opts := &WorkerOpts{} + opts := &Options{} cmd := &cobra.Command{ Use: "worker", diff --git a/compute/local/backend.go b/compute/local/backend.go index 606133e7f..9457bf456 100644 --- a/compute/local/backend.go +++ b/compute/local/backend.go @@ -3,15 +3,12 @@ package local import ( "context" - "syscall" - "time" workerCmd "github.com/ohsu-comp-bio/funnel/cmd/worker" "github.com/ohsu-comp-bio/funnel/config" "github.com/ohsu-comp-bio/funnel/events" "github.com/ohsu-comp-bio/funnel/logger" "github.com/ohsu-comp-bio/funnel/tes" - "github.com/ohsu-comp-bio/funnel/util" ) // NewBackend returns a new local Backend instance. @@ -38,10 +35,9 @@ func (b *Backend) WriteEvent(ctx context.Context, ev *events.Event) error { // Submit submits a task. For the Local backend this results in the task // running immediately. func (b *Backend) Submit(task *tes.Task) error { - ctx, cancel := context.WithCancel(context.Background()) - ctx = util.SignalContext(ctx, time.Millisecond, syscall.SIGINT, syscall.SIGTERM) + ctx := context.Background() - w, err := workerCmd.NewWorker(ctx, b.conf, b.log, &workerCmd.WorkerOpts{ + w, err := workerCmd.NewWorker(ctx, b.conf, b.log, &workerCmd.Options{ TaskID: task.Id, }) if err != nil { @@ -49,7 +45,6 @@ func (b *Backend) Submit(task *tes.Task) error { } go func() { - defer cancel() w.Run(ctx) }() return nil diff --git a/worker/file_taskreader.go b/worker/file_taskreader.go index c88a2b394..e0726665a 100644 --- a/worker/file_taskreader.go +++ b/worker/file_taskreader.go @@ -9,11 +9,13 @@ import ( "github.com/ohsu-comp-bio/funnel/tes" ) +// FileTaskReader provides a TaskReader implementation from a task file. type FileTaskReader struct { Path string task *tes.Task } +// Task returns the task. A random ID will be generated. func (f *FileTaskReader) Task(ctx context.Context) (*tes.Task, error) { if f.task != nil { return f.task, nil @@ -27,7 +29,12 @@ func (f *FileTaskReader) Task(ctx context.Context) (*tes.Task, error) { return f.task, nil } +// State returns the task state. Due to some quirks in the implementation +// of this reader, this will always return QUEUED. func (f *FileTaskReader) State(ctx context.Context) (tes.State, error) { + // TODO tes.InitTask is being used internally, which initializes the + // task to QUEUED. So this will always return queued. + if f.task != nil { return f.task.State, nil } From a56754f7878318f0928de8fa097930cc2ee56239 Mon Sep 17 00:00:00 2001 From: Alex Buchanan Date: Mon, 9 Jul 2018 09:57:07 -0700 Subject: [PATCH 4/7] worker: file/string task readers --- cmd/worker/worker.go | 6 ++-- worker/base64_taskreader.go | 42 ++++++++++++++++++++++++++ worker/file_taskreader.go | 59 ++++++++++++------------------------- 3 files changed, 65 insertions(+), 42 deletions(-) create mode 100644 worker/base64_taskreader.go diff --git a/cmd/worker/worker.go b/cmd/worker/worker.go index 344f547cd..45e0eb420 100644 --- a/cmd/worker/worker.go +++ b/cmd/worker/worker.go @@ -17,6 +17,7 @@ import ( type Options struct { TaskID string TaskFile string + TaskBase64 string } // NewCommand returns the worker command @@ -65,8 +66,8 @@ func newCommandHooks() (*cobra.Command, *hooks) { Short: "Run a task directly, bypassing the server.", Args: cobra.NoArgs, RunE: func(cmd *cobra.Command, args []string) error { - if opts.TaskID == "" && opts.TaskFile == "" { - return fmt.Errorf("no task ID nor task file was provided") + if opts.TaskID == "" && opts.TaskFile == "" && opts.TaskBase64 == "" { + return fmt.Errorf("no task was provided") } log := logger.NewLogger("worker", conf.Logger) @@ -81,6 +82,7 @@ func newCommandHooks() (*cobra.Command, *hooks) { f = run.Flags() f.StringVarP(&opts.TaskID, "taskID", "t", opts.TaskID, "Task ID") f.StringVarP(&opts.TaskFile, "taskFile", "f", opts.TaskFile, "Task file") + f.StringVarP(&opts.TaskBase64, "taskBase64", "b", opts.TaskBase64, "Task base64") cmd.AddCommand(run) return cmd, hooks diff --git a/worker/base64_taskreader.go b/worker/base64_taskreader.go new file mode 100644 index 000000000..68678a76a --- /dev/null +++ b/worker/base64_taskreader.go @@ -0,0 +1,42 @@ +package worker + +import ( + "bytes" + "context" + "encoding/base64" + "fmt" + + "github.com/golang/protobuf/jsonpb" + "github.com/ohsu-comp-bio/funnel/tes" +) + +type Base64TaskReader struct { + task *tes.Task +} + +func NewBase64TaskReader(raw string) (*Base64TaskReader, error) { + data, err := base64.StdEncoding.DecodeString(raw) + if err != nil { + return nil, fmt.Errorf("decoding task: %v", err) + } + + task := &tes.Task{} + buf := bytes.NewBuffer(data) + err = jsonpb.Unmarshal(buf, task) + if err != nil { + return nil, fmt.Errorf("unmarshaling task: %v", err) + } + return &Base64TaskReader{task}, nil +} + +// Task returns the task. A random ID will be generated. +func (f *Base64TaskReader) Task(ctx context.Context) (*tes.Task, error) { + return f.task, nil +} + +// State returns the task state. Due to some quirks in the implementation +// of this reader, and since there is no online database to connect to, +// this will always return QUEUED. +func (f *Base64TaskReader) State(ctx context.Context) (tes.State, error) { + return f.task.GetState(), nil +} diff --git a/worker/file_taskreader.go b/worker/file_taskreader.go index e0726665a..5edc77098 100644 --- a/worker/file_taskreader.go +++ b/worker/file_taskreader.go @@ -11,59 +11,38 @@ import ( // FileTaskReader provides a TaskReader implementation from a task file. type FileTaskReader struct { - Path string task *tes.Task } -// Task returns the task. A random ID will be generated. -func (f *FileTaskReader) Task(ctx context.Context) (*tes.Task, error) { - if f.task != nil { - return f.task, nil - } - - err := f.load() - if err != nil { - return nil, err - } - - return f.task, nil -} - -// State returns the task state. Due to some quirks in the implementation -// of this reader, this will always return QUEUED. -func (f *FileTaskReader) State(ctx context.Context) (tes.State, error) { - // TODO tes.InitTask is being used internally, which initializes the - // task to QUEUED. So this will always return queued. - - if f.task != nil { - return f.task.State, nil - } - - err := f.load() +func NewFileTaskReader(path string) (*FileTaskReader, error) { + fh, err := os.Open(path) if err != nil { - return tes.Unknown, err - } - - return f.task.State, nil -} - -func (f *FileTaskReader) load() error { - fh, err := os.Open(f.Path) - if err != nil { - return fmt.Errorf("opening task file: %v", err) + return nil, fmt.Errorf("opening task file: %v", err) } defer fh.Close() task := &tes.Task{} err = jsonpb.Unmarshal(fh, task) if err != nil { - return fmt.Errorf("unmarshaling task file: %v", err) + return nil, fmt.Errorf("unmarshaling task file: %v", err) } + err = tes.InitTask(task) if err != nil { - return fmt.Errorf("validating task: %v", err) + return nil, fmt.Errorf("validating task: %v", err) } - f.task = task - return nil + return &FileTaskReader{task}, nil +} + +// Task returns the task. A random ID will be generated. +func (f *FileTaskReader) Task(ctx context.Context) (*tes.Task, error) { + return f.task, nil +} + +// State returns the task state. Due to some quirks in the implementation +// of this reader, and since there is no online database to connect to, +// this will always return QUEUED. +func (f *FileTaskReader) State(ctx context.Context) (tes.State, error) { + return f.task.GetState(), nil } From bb90914ab310700d3bb73b1db12f9b7eb94c2199 Mon Sep 17 00:00:00 2001 From: Alex Buchanan Date: Mon, 9 Jul 2018 12:03:50 -0700 Subject: [PATCH 5/7] cmd/worker: refactor init, handle file/string task reader source --- cmd/worker/run.go | 222 ++++++++++++++++++++++++------------ cmd/worker/worker.go | 6 +- server/tes.go | 2 +- tes/utils.go | 18 ++- worker/base64_taskreader.go | 16 ++- worker/file_taskreader.go | 8 +- 6 files changed, 183 insertions(+), 89 deletions(-) diff --git a/cmd/worker/run.go b/cmd/worker/run.go index f9bdf9914..b7db27620 100644 --- a/cmd/worker/run.go +++ b/cmd/worker/run.go @@ -14,6 +14,7 @@ import ( "github.com/ohsu-comp-bio/funnel/logger" "github.com/ohsu-comp-bio/funnel/storage" "github.com/ohsu-comp-bio/funnel/tes" + "github.com/ohsu-comp-bio/funnel/util" "github.com/ohsu-comp-bio/funnel/worker" ) @@ -30,97 +31,174 @@ func Run(ctx context.Context, conf config.Config, log *logger.Logger, opts *Opti func NewWorker(ctx context.Context, conf config.Config, log *logger.Logger, opts *Options) (*worker.DefaultWorker, error) { log.Debug("NewWorker", "config", conf) - var err error - var db tes.ReadOnlyServer - var reader worker.TaskReader - var writer events.Writer - var writers events.MultiWriter - - eventWriterSet := map[string]interface{}{ - strings.ToLower(conf.Database): nil, - } - for _, w := range conf.EventWriters { - eventWriterSet[strings.ToLower(w)] = nil + err := validateConfig(conf, opts) + if err != nil { + return nil, fmt.Errorf("validating config: %v", err) } - // TODO bad hack - if opts.TaskFile != "" { - writer = &events.Logger{Log: log} - } else { - for e := range eventWriterSet { - switch e { - case "log": - writer = &events.Logger{Log: log} - case "boltdb": - writer, err = events.NewRPCWriter(ctx, conf.RPCClient) - case "badger": - writer, err = events.NewRPCWriter(ctx, conf.RPCClient) - case "dynamodb": - writer, err = dynamodb.NewDynamoDB(conf.DynamoDB) - case "datastore": - writer, err = datastore.NewDatastore(conf.Datastore) - case "elastic": - writer, err = elastic.NewElastic(conf.Elastic) - case "kafka": - writer, err = events.NewKafkaWriter(ctx, conf.Kafka) - case "pubsub": - writer, err = events.NewPubSubWriter(ctx, conf.PubSub) - case "mongodb": - writer, err = mongodb.NewMongoDB(conf.MongoDB) - default: - err = fmt.Errorf("unknown event writer: %s", e) - } - if err != nil { - return nil, fmt.Errorf("error occurred while initializing the %s event writer: %v", e, err) - } - } + // Construct a set of event writers based on the config. + builder := eventWriterBuilder{} + // If the task comes from a file or string, + // don't assume we should write to the database. + if opts.TaskFile == "" && opts.TaskBase64 == "" { + builder.Add(ctx, conf.Database, conf, log) } - if writer != nil { - writers = append(writers, writer) + // Add configured event writers. + for _, e := range conf.EventWriters { + builder.Add(ctx, e, conf, log) + } + // Get the built writer. + writer, err := builder.Writer() + if err != nil { + return nil, fmt.Errorf("creating event writers: %v", err) } - writer = &events.SystemLogFilter{Writer: &writers, Level: conf.Logger.Level} + // Wrap the event writers in a couple filters. + writer = &events.SystemLogFilter{Writer: writer, Level: conf.Logger.Level} writer = &events.ErrLogger{Writer: writer, Log: log} - if opts.TaskFile != "" { - reader = &worker.FileTaskReader{Path: opts.TaskFile} - } else { - switch strings.ToLower(conf.Database) { - case "datastore": - db, err = datastore.NewDatastore(conf.Datastore) - case "dynamodb": - db, err = dynamodb.NewDynamoDB(conf.DynamoDB) - case "elastic": - db, err = elastic.NewElastic(conf.Elastic) - case "mongodb": - db, err = mongodb.NewMongoDB(conf.MongoDB) - case "boltdb": - reader, err = worker.NewRPCTaskReader(ctx, conf.RPCClient, opts.TaskID) - case "badger": - reader, err = worker.NewRPCTaskReader(ctx, conf.RPCClient, opts.TaskID) - default: - err = fmt.Errorf("unknown database: '%s'", conf.Database) - } - if err != nil { - return nil, fmt.Errorf("failed to instantiate database client: %v", err) - } - if reader == nil { - reader = worker.NewGenericTaskReader(db.GetTask, opts.TaskID) - } + // Get the task source reader: database, file, etc. + reader, err := newTaskReader(ctx, conf, opts) + if err != nil { + return nil, fmt.Errorf("creating task reader: %v", err) } + // Initialize task storage client. store, err := storage.NewMux(conf) if err != nil { return nil, fmt.Errorf("failed to instantiate Storage backend: %v", err) } store.AttachLogger(log) - w := &worker.DefaultWorker{ + return &worker.DefaultWorker{ Conf: conf.Worker, Store: store, TaskReader: reader, EventWriter: writer, + }, nil +} + +// newTaskReader finds a TaskReader implementation that matches the config +// and commandline options. +func newTaskReader(ctx context.Context, conf config.Config, opts *Options) (worker.TaskReader, error) { + + switch { + // These readers are used to read a local task from a file, cli arg, etc. + case opts.TaskFile != "": + return worker.NewFileTaskReader(opts.TaskFile) + + case opts.TaskBase64 != "": + return worker.NewBase64TaskReader(opts.TaskBase64) + } + + switch strings.ToLower(conf.Database) { + // These readers will connect to the configured task database. + case "datastore": + db, err := datastore.NewDatastore(conf.Datastore) + return newDatabaseTaskReader(opts.TaskID, db, err) + + case "dynamodb": + db, err := dynamodb.NewDynamoDB(conf.DynamoDB) + return newDatabaseTaskReader(opts.TaskID, db, err) + + case "elastic": + db, err := elastic.NewElastic(conf.Elastic) + return newDatabaseTaskReader(opts.TaskID, db, err) + + case "mongodb": + db, err := mongodb.NewMongoDB(conf.MongoDB) + return newDatabaseTaskReader(opts.TaskID, db, err) + + // These readers connect via RPC (because the database is embedded in the server). + case "boltdb", "badger": + return worker.NewRPCTaskReader(ctx, conf.RPCClient, opts.TaskID) + + // No matching reader. Fail. + default: + return nil, fmt.Errorf("no matching task reader found") } +} - return w, nil +// newDatabaseTaskReader helps create a generic task reader wrapper +// for the given database backend. +func newDatabaseTaskReader(taskID string, db tes.ReadOnlyServer, err error) (worker.TaskReader, error) { + if err != nil { + return nil, fmt.Errorf("creating database task reader: %v", err) + } + return worker.NewGenericTaskReader(db.GetTask, taskID), nil +} + +// eventWriterBuilder is a helper for building a set of event writers, +// collecting errors, de-duplicating config, etc. +type eventWriterBuilder struct { + errors util.MultiError + writers events.MultiWriter + // seen tracks which event writers have already been built, + // so we don't build the same one twice. + seen map[string]bool +} + +// Writers gets all the event writers and errors collected by multiple calls to Add(). +func (e *eventWriterBuilder) Writer() (events.Writer, error) { + return &e.writers, e.errors.ToError() +} + +// Add creates a new event writer by name and adds it to the builder. +func (e *eventWriterBuilder) Add(ctx context.Context, name string, conf config.Config, log *logger.Logger) { + if name == "" { + return + } + + if e.seen == nil { + e.seen = map[string]bool{} + } + + // If we've already created this event writer "name", skip it. + if _, ok := e.seen[name]; ok { + return + } + e.seen[name] = true + + var err error + var writer events.Writer + + switch name { + case "log": + writer = &events.Logger{Log: log} + case "boltdb", "badger": + writer, err = events.NewRPCWriter(ctx, conf.RPCClient) + case "dynamodb": + writer, err = dynamodb.NewDynamoDB(conf.DynamoDB) + case "datastore": + writer, err = datastore.NewDatastore(conf.Datastore) + case "elastic": + writer, err = elastic.NewElastic(conf.Elastic) + case "kafka": + writer, err = events.NewKafkaWriter(ctx, conf.Kafka) + case "pubsub": + writer, err = events.NewPubSubWriter(ctx, conf.PubSub) + case "mongodb": + writer, err = mongodb.NewMongoDB(conf.MongoDB) + default: + err = fmt.Errorf("unknown event writer: %s", name) + } + + if err != nil { + e.errors = append(e.errors, err) + } else { + e.writers = append(e.writers, writer) + } +} + +func validateConfig(conf config.Config, opts *Options) error { + // If the task reader is a file or string, + // only a subset of event writers are supported. + if opts.TaskFile != "" || opts.TaskBase64 != "" { + for _, e := range conf.EventWriters { + if e != "log" && e != "kafka" && e != "pubsub" { + return fmt.Errorf("event writer %q is not supported with a task file/string reader", e) + } + } + } + return nil } diff --git a/cmd/worker/worker.go b/cmd/worker/worker.go index 45e0eb420..c70e41230 100644 --- a/cmd/worker/worker.go +++ b/cmd/worker/worker.go @@ -15,9 +15,9 @@ import ( // Options holds a few CLI options for worker entrypoints. type Options struct { - TaskID string - TaskFile string - TaskBase64 string + TaskID string + TaskFile string + TaskBase64 string } // NewCommand returns the worker command diff --git a/server/tes.go b/server/tes.go index f9dc941c0..1b95291b1 100644 --- a/server/tes.go +++ b/server/tes.go @@ -34,7 +34,7 @@ type TaskService struct { // This is part of the TES implementation. func (ts *TaskService) CreateTask(ctx context.Context, task *tes.Task) (*tes.CreateTaskResponse, error) { - if err := tes.InitTask(task); err != nil { + if err := tes.InitTask(task, true); err != nil { return nil, grpc.Errorf(codes.InvalidArgument, err.Error()) } diff --git a/tes/utils.go b/tes/utils.go index a146d8ce8..5a1463392 100644 --- a/tes/utils.go +++ b/tes/utils.go @@ -46,10 +46,20 @@ func GenerateID() string { // such as Id, CreationTime, State, etc. If the task fails validation, // an error is returned. See Validate(). // The given task is modified. -func InitTask(task *Task) error { - task.Id = GenerateID() - task.State = Queued - task.CreationTime = time.Now().Format(time.RFC3339Nano) +// +// If "overwrite" is true, the fields Id, State, and CreationTime +// will always be overwritten, even if already set, otherwise they +// will only be set if they are empty. +func InitTask(task *Task, overwrite bool) error { + if overwrite || task.Id == "" { + task.Id = GenerateID() + } + if overwrite || task.State == Unknown { + task.State = Queued + } + if overwrite || task.CreationTime == "" { + task.CreationTime = time.Now().Format(time.RFC3339Nano) + } if err := Validate(task); err != nil { return fmt.Errorf("invalid task message:\n%s", err) } diff --git a/worker/base64_taskreader.go b/worker/base64_taskreader.go index 68678a76a..eb545e2b6 100644 --- a/worker/base64_taskreader.go +++ b/worker/base64_taskreader.go @@ -1,9 +1,9 @@ package worker import ( - "bytes" + "bytes" "context" - "encoding/base64" + "encoding/base64" "fmt" "github.com/golang/protobuf/jsonpb" @@ -15,18 +15,24 @@ type Base64TaskReader struct { } func NewBase64TaskReader(raw string) (*Base64TaskReader, error) { - data, err := base64.StdEncoding.DecodeString(raw) + data, err := base64.StdEncoding.DecodeString(raw) if err != nil { return nil, fmt.Errorf("decoding task: %v", err) } task := &tes.Task{} - buf := bytes.NewBuffer(data) + buf := bytes.NewBuffer(data) err = jsonpb.Unmarshal(buf, task) if err != nil { return nil, fmt.Errorf("unmarshaling task: %v", err) } - return &Base64TaskReader{task}, nil + + err = tes.InitTask(task, false) + if err != nil { + return nil, fmt.Errorf("initializing task: %v", err) + } + + return &Base64TaskReader{task}, nil } // Task returns the task. A random ID will be generated. diff --git a/worker/file_taskreader.go b/worker/file_taskreader.go index 5edc77098..5feb64863 100644 --- a/worker/file_taskreader.go +++ b/worker/file_taskreader.go @@ -27,12 +27,12 @@ func NewFileTaskReader(path string) (*FileTaskReader, error) { return nil, fmt.Errorf("unmarshaling task file: %v", err) } - err = tes.InitTask(task) + err = tes.InitTask(task, false) if err != nil { - return nil, fmt.Errorf("validating task: %v", err) + return nil, fmt.Errorf("initializing task: %v", err) } - return &FileTaskReader{task}, nil + return &FileTaskReader{task}, nil } // Task returns the task. A random ID will be generated. @@ -44,5 +44,5 @@ func (f *FileTaskReader) Task(ctx context.Context) (*tes.Task, error) { // of this reader, and since there is no online database to connect to, // this will always return QUEUED. func (f *FileTaskReader) State(ctx context.Context) (tes.State, error) { - return f.task.GetState(), nil + return f.task.GetState(), nil } From bf72c62be43414d8c775c983afa846e906ee75d4 Mon Sep 17 00:00:00 2001 From: Alex Buchanan Date: Mon, 9 Jul 2018 14:10:32 -0700 Subject: [PATCH 6/7] worker: fix tests for new factory interface --- cmd/worker/worker_test.go | 2 +- tests/core/basic_test.go | 31 ++++++++++++++++++++----------- tests/core/worker_test.go | 20 ++++++++++---------- tests/kafka/worker_test.go | 2 +- tests/pubsub/pubsub_test.go | 2 +- tests/scheduler/node_test.go | 13 +++++++++---- worker/base64_taskreader.go | 2 ++ worker/file_taskreader.go | 1 + 8 files changed, 45 insertions(+), 28 deletions(-) diff --git a/cmd/worker/worker_test.go b/cmd/worker/worker_test.go index 6b637dd70..20d3466c6 100644 --- a/cmd/worker/worker_test.go +++ b/cmd/worker/worker_test.go @@ -24,7 +24,7 @@ func TestPersistentPreRun(t *testing.T) { defer cleanup() c, h := newCommandHooks() - h.Run = func(ctx context.Context, conf config.Config, log *logger.Logger, taskID string) error { + h.Run = func(ctx context.Context, conf config.Config, log *logger.Logger, opts *Options) error { if conf.Server.HostName != host { t.Fatal("unexpected Server.HostName in config", conf.Server.HostName) } diff --git a/tests/core/basic_test.go b/tests/core/basic_test.go index 33d9ef99a..1d9ec6ed5 100644 --- a/tests/core/basic_test.go +++ b/tests/core/basic_test.go @@ -1125,26 +1125,34 @@ func TestConcurrentStateUpdate(t *testing.T) { f := tests.NewFunnel(c) f.StartServer() - w, err := workerCmd.NewWorker(ctx, c, log) - if err != nil { - t.Fatal(err) - } - e := w.EventWriter - ids := []string{} for i := 0; i < 10; i++ { id := f.Run(`--sh 'echo hello'`) ids = append(ids, id) + go func(id string) { + opts := &workerCmd.Options{TaskID: id} + w, err := workerCmd.NewWorker(ctx, c, log, opts) + if err != nil { + t.Fatal(err) + } + log.Info("writing state initializing event", "taskID", id) - err := e.WriteEvent(ctx, events.NewState(id, tes.Initializing)) + err = w.EventWriter.WriteEvent(ctx, events.NewState(id, tes.Initializing)) if err != nil { log.Error("error writing event", err) } }(id) + go func(id string) { + opts := &workerCmd.Options{TaskID: id} + w, err := workerCmd.NewWorker(ctx, c, log, opts) + if err != nil { + t.Fatal(err) + } + log.Info("writing state canceled event", "taskID", id) - err := e.WriteEvent(ctx, events.NewState(id, tes.Canceled)) + err = w.EventWriter.WriteEvent(ctx, events.NewState(id, tes.Canceled)) if err != nil { log.Error("error writing event", "error", err, "taskID", id) } @@ -1169,12 +1177,13 @@ func TestMetadataEvent(t *testing.T) { f := tests.NewFunnel(c) f.StartServer() - w, err := workerCmd.NewWorker(ctx, c, log) + id := f.Run(`--sh 'echo hello'`) + + w, err := workerCmd.NewWorker(ctx, c, log, &workerCmd.Options{TaskID: id}) if err != nil { t.Fatal(err) } e := w.EventWriter - id := f.Run(`--sh 'echo hello'`) err = e.WriteEvent(ctx, events.NewMetadata(id, 0, map[string]string{"one": "two"})) if err != nil { @@ -1185,7 +1194,7 @@ func TestMetadataEvent(t *testing.T) { t.Error("error writing event", "error", err, "taskID", id) } - err = w.Run(ctx, id) + err = w.Run(ctx) if err != nil { t.Error("error running task", "error", err, "taskID", id) } diff --git a/tests/core/worker_test.go b/tests/core/worker_test.go index d8b05c8ea..ea0b8ed63 100644 --- a/tests/core/worker_test.go +++ b/tests/core/worker_test.go @@ -32,7 +32,7 @@ func TestWorkerRun(t *testing.T) { `) ctx := context.Background() - err := workerCmd.Run(ctx, c, log, id) + err := workerCmd.Run(ctx, c, log, &workerCmd.Options{TaskID: id}) if err != nil { t.Fatal("unexpected error", err) } @@ -70,7 +70,7 @@ func TestWorkDirCleanup(t *testing.T) { workdir := path.Join(c.Worker.WorkDir, id) ctx := context.Background() - err := workerCmd.Run(ctx, c, log, id) + err := workerCmd.Run(ctx, c, log, &workerCmd.Options{TaskID: id}) if err != nil { t.Fatal("unexpected error", err) } @@ -99,7 +99,7 @@ func TestWorkDirCleanup(t *testing.T) { c.Worker.LeaveWorkDir = true workdir = path.Join(c.Worker.WorkDir, id) - err = workerCmd.Run(ctx, c, log, id) + err = workerCmd.Run(ctx, c, log, &workerCmd.Options{TaskID: id}) if err != nil { t.Fatal("unexpected error", err) } @@ -141,11 +141,11 @@ type taskReader struct { task *tes.Task } -func (r taskReader) Task(ctx gcontext.Context, taskID string) (*tes.Task, error) { +func (r taskReader) Task(ctx gcontext.Context) (*tes.Task, error) { return r.task, nil } -func (r taskReader) State(ctx gcontext.Context, taskID string) (tes.State, error) { +func (r taskReader) State(ctx gcontext.Context) (tes.State, error) { return r.task.State, nil } @@ -180,7 +180,7 @@ func TestLargeLogRate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - w.Run(ctx, task.Id) + w.Run(ctx) // Given the difficulty of timing how long it task a task + docker container to start, // we just check that a small amount of events were generated. @@ -219,7 +219,7 @@ func TestZeroLogRate(t *testing.T) { EventWriter: m, } - w.Run(context.Background(), task.Id) + w.Run(context.Background()) time.Sleep(time.Second) @@ -260,7 +260,7 @@ func TestZeroLogTailSize(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - w.Run(ctx, task.Id) + w.Run(ctx) // we expect zero events to be generated if counts.stdout != 0 { @@ -295,7 +295,7 @@ func TestLogTailContent(t *testing.T) { EventWriter: m, } - err := w.Run(context.Background(), task.Id) + err := w.Run(context.Background()) if err != nil { t.Error("unexpected worker.Run error", err) } @@ -339,7 +339,7 @@ func TestDockerContainerMetadata(t *testing.T) { EventWriter: m, } - err := w.Run(context.Background(), task.Id) + err := w.Run(context.Background()) if err != nil { t.Error("unexpected worker.Run error", err) } diff --git a/tests/kafka/worker_test.go b/tests/kafka/worker_test.go index 552a22d7a..e7f7fcdbd 100644 --- a/tests/kafka/worker_test.go +++ b/tests/kafka/worker_test.go @@ -61,7 +61,7 @@ func TestKafkaWorkerRun(t *testing.T) { --sh 'echo hello world' `) - err = workerCmd.Run(ctx, conf, log, id) + err = workerCmd.Run(ctx, conf, log, &workerCmd.Options{TaskID: id}) if err != nil { t.Fatal("unexpected error", err) } diff --git a/tests/pubsub/pubsub_test.go b/tests/pubsub/pubsub_test.go index 5f80a7b78..68260e1a0 100644 --- a/tests/pubsub/pubsub_test.go +++ b/tests/pubsub/pubsub_test.go @@ -62,7 +62,7 @@ func TestPubSubWorkerRun(t *testing.T) { // compute backend is in use id := fun.Run(`'echo hello world'`) - err := workerCmd.Run(ctx, conf, nil, id) + err := workerCmd.Run(ctx, conf, nil, &workerCmd.Options{TaskID: id}) if err != nil { t.Fatal("unexpected error", err) } diff --git a/tests/scheduler/node_test.go b/tests/scheduler/node_test.go index 7f5d9aa20..dae6327f7 100644 --- a/tests/scheduler/node_test.go +++ b/tests/scheduler/node_test.go @@ -86,11 +86,16 @@ func TestManualBackend(t *testing.T) { srv.Conf.Node.ID = "test-node-manual" // create a node srv.Conf.Node.ID = "test-node-manual" - w, err := workercmd.NewWorker(ctx, conf, log) - if err != nil { - t.Fatal("failed to create worker factory", err) + + factory := func(ctx context.Context, taskID string) error { + w, err := workercmd.NewWorker(ctx, conf, log, &workercmd.Options{TaskID: taskID}) + if err != nil { + return err + } + return w.Run(ctx) } - n, err := scheduler.NewNodeProcess(ctx, srv.Conf, w.Run, log) + + n, err := scheduler.NewNodeProcess(ctx, srv.Conf, factory, log) if err != nil { t.Fatal("failed to create node", err) } diff --git a/worker/base64_taskreader.go b/worker/base64_taskreader.go index eb545e2b6..8c86beb67 100644 --- a/worker/base64_taskreader.go +++ b/worker/base64_taskreader.go @@ -10,10 +10,12 @@ import ( "github.com/ohsu-comp-bio/funnel/tes" ) +// Base64TaskReader reads a task from a base64 encoded string. type Base64TaskReader struct { task *tes.Task } +// NewBase64TaskReader creates a new Base64TaskReader. func NewBase64TaskReader(raw string) (*Base64TaskReader, error) { data, err := base64.StdEncoding.DecodeString(raw) if err != nil { diff --git a/worker/file_taskreader.go b/worker/file_taskreader.go index 5feb64863..fb67519b5 100644 --- a/worker/file_taskreader.go +++ b/worker/file_taskreader.go @@ -14,6 +14,7 @@ type FileTaskReader struct { task *tes.Task } +// NewFileTaskReader creates a new FileTaskReader. func NewFileTaskReader(path string) (*FileTaskReader, error) { fh, err := os.Open(path) if err != nil { From d6bdd1c7a2613e9022a7e4941bf7217a81ff3efd Mon Sep 17 00:00:00 2001 From: Alex Buchanan Date: Tue, 10 Jul 2018 09:59:53 -0700 Subject: [PATCH 7/7] worker: add file/base64 task reader --- cmd/worker/worker_test.go | 43 +++++++++++++++++++++++++++++ tests/core/worker_test.go | 58 +++++++++++++++++++++++++++++++++++++++ worker/file_taskreader.go | 2 ++ worker/taskreader_test.go | 40 +++++++++++++++++++++++++++ 4 files changed, 143 insertions(+) create mode 100644 worker/taskreader_test.go diff --git a/cmd/worker/worker_test.go b/cmd/worker/worker_test.go index 20d3466c6..e280601d3 100644 --- a/cmd/worker/worker_test.go +++ b/cmd/worker/worker_test.go @@ -54,3 +54,46 @@ func TestPersistentPreRun(t *testing.T) { } } } + +func TestTaskFileOption(t *testing.T) { + c, h := newCommandHooks() + h.Run = func(ctx context.Context, conf config.Config, log *logger.Logger, opts *Options) error { + if opts.TaskFile != "test.task.json" { + t.Fatal("unexpected task file option", opts.TaskFile) + } + return nil + } + + c.SetArgs([]string{"run", "--taskFile", "test.task.json"}) + c.Execute() + + h.Run = func(ctx context.Context, conf config.Config, log *logger.Logger, opts *Options) error { + if opts.TaskFile != "test.task.json" { + t.Fatal("unexpected task file option", opts.TaskFile) + } + return nil + } + + c.SetArgs([]string{"run", "-f", "test.task.json"}) + c.Execute() + + h.Run = func(ctx context.Context, conf config.Config, log *logger.Logger, opts *Options) error { + if opts.TaskBase64 != "abcd" { + t.Fatal("unexpected task base64 option", opts.TaskBase64) + } + return nil + } + + c.SetArgs([]string{"run", "--taskBase64", "abcd"}) + c.Execute() + + h.Run = func(ctx context.Context, conf config.Config, log *logger.Logger, opts *Options) error { + if opts.TaskBase64 != "abcd" { + t.Fatal("unexpected task base64 option", opts.TaskBase64) + } + return nil + } + + c.SetArgs([]string{"run", "-b", "abcd"}) + c.Execute() +} diff --git a/tests/core/worker_test.go b/tests/core/worker_test.go index ea0b8ed63..519c3a964 100644 --- a/tests/core/worker_test.go +++ b/tests/core/worker_test.go @@ -372,3 +372,61 @@ func TestDockerContainerMetadata(t *testing.T) { t.Error("didn't find container image hash metadata") } } + +func TestWorkerRunFileTaskReader(t *testing.T) { + tests.SetLogOutput(log, t) + c := tests.DefaultConfig() + ctx := context.Background() + + // Task builder collects events into a task view. + task := &tes.Task{} + b := events.TaskBuilder{Task: task} + + opts := &workerCmd.Options{ + TaskFile: "../../examples/hello-world.json", + } + + worker, err := workerCmd.NewWorker(ctx, c, log, opts) + if err != nil { + t.Fatal("unexpected error", err) + } + worker.EventWriter = &events.MultiWriter{b, worker.EventWriter} + + err = worker.Run(ctx) + if err != nil { + t.Fatal("unexpected error", err) + } + + if task.State != tes.Complete { + t.Error("unexpected task state") + } +} + +func TestWorkerRunBase64TaskReader(t *testing.T) { + tests.SetLogOutput(log, t) + c := tests.DefaultConfig() + ctx := context.Background() + + // Task builder collects events into a task view. + task := &tes.Task{} + b := events.TaskBuilder{Task: task} + + opts := &workerCmd.Options{ + TaskBase64: "ewogICJuYW1lIjogIkhlbGxvIHdvcmxkIiwKICAiZGVzY3JpcHRpb24iOiAiRGVtb25zdHJhdGVzIHRoZSBtb3N0IGJhc2ljIGVjaG8gdGFzay4iLAogICJleGVjdXRvcnMiOiBbCiAgICB7CiAgICAgICJpbWFnZSI6ICJhbHBpbmUiLAogICAgICAiY29tbWFuZCI6IFsiZWNobyIsICJoZWxsbyB3b3JsZCJdCiAgICB9CiAgXQp9Cg==", + } + + worker, err := workerCmd.NewWorker(ctx, c, log, opts) + if err != nil { + t.Fatal("unexpected error", err) + } + worker.EventWriter = &events.MultiWriter{b, worker.EventWriter} + + err = worker.Run(ctx) + if err != nil { + t.Fatal("unexpected error", err) + } + + if task.State != tes.Complete { + t.Error("unexpected task state") + } +} diff --git a/worker/file_taskreader.go b/worker/file_taskreader.go index fb67519b5..7228cb860 100644 --- a/worker/file_taskreader.go +++ b/worker/file_taskreader.go @@ -16,6 +16,8 @@ type FileTaskReader struct { // NewFileTaskReader creates a new FileTaskReader. func NewFileTaskReader(path string) (*FileTaskReader, error) { + // TODO not sure if it's better to return an error immediately, + // or return an error from Task() fh, err := os.Open(path) if err != nil { return nil, fmt.Errorf("opening task file: %v", err) diff --git a/worker/taskreader_test.go b/worker/taskreader_test.go new file mode 100644 index 000000000..994d13c28 --- /dev/null +++ b/worker/taskreader_test.go @@ -0,0 +1,40 @@ +package worker + +import ( + "context" + "testing" +) + +func TestFileTaskReader(t *testing.T) { + r, err := NewFileTaskReader("../examples/hello-world.json") + if err != nil { + t.Fatal(err) + } + + ctx := context.Background() + task, err := r.Task(ctx) + if task.Name != "Hello world" { + t.Error("unexpected task content") + } + + if task.Id == "" { + t.Error("unexpected empty task ID") + } +} + +func TestBase64TaskReader(t *testing.T) { + r, err := NewBase64TaskReader("ewogICJuYW1lIjogIkhlbGxvIHdvcmxkIiwKICAiZGVzY3JpcHRpb24iOiAiRGVtb25zdHJhdGVzIHRoZSBtb3N0IGJhc2ljIGVjaG8gdGFzay4iLAogICJleGVjdXRvcnMiOiBbCiAgICB7CiAgICAgICJpbWFnZSI6ICJhbHBpbmUiLAogICAgICAiY29tbWFuZCI6IFsiZWNobyIsICJoZWxsbyB3b3JsZCJdCiAgICB9CiAgXQp9Cg==") + if err != nil { + t.Fatal(err) + } + + ctx := context.Background() + task, err := r.Task(ctx) + if task.Name != "Hello world" { + t.Error("unexpected task content") + } + + if task.Id == "" { + t.Error("unexpected empty task ID") + } +}