Skip to content

Commit

Permalink
cmd/worker: refactor init, handle file/string task reader source
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Buchanan committed Jul 9, 2018
1 parent a56754f commit bb90914
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 89 deletions.
222 changes: 150 additions & 72 deletions cmd/worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ohsu-comp-bio/funnel/logger"
"github.com/ohsu-comp-bio/funnel/storage"
"github.com/ohsu-comp-bio/funnel/tes"
"github.com/ohsu-comp-bio/funnel/util"
"github.com/ohsu-comp-bio/funnel/worker"
)

Expand All @@ -30,97 +31,174 @@ func Run(ctx context.Context, conf config.Config, log *logger.Logger, opts *Opti
func NewWorker(ctx context.Context, conf config.Config, log *logger.Logger, opts *Options) (*worker.DefaultWorker, error) {
log.Debug("NewWorker", "config", conf)

var err error
var db tes.ReadOnlyServer
var reader worker.TaskReader
var writer events.Writer
var writers events.MultiWriter

eventWriterSet := map[string]interface{}{
strings.ToLower(conf.Database): nil,
}
for _, w := range conf.EventWriters {
eventWriterSet[strings.ToLower(w)] = nil
err := validateConfig(conf, opts)
if err != nil {
return nil, fmt.Errorf("validating config: %v", err)
}

// TODO bad hack
if opts.TaskFile != "" {
writer = &events.Logger{Log: log}
} else {
for e := range eventWriterSet {
switch e {
case "log":
writer = &events.Logger{Log: log}
case "boltdb":
writer, err = events.NewRPCWriter(ctx, conf.RPCClient)
case "badger":
writer, err = events.NewRPCWriter(ctx, conf.RPCClient)
case "dynamodb":
writer, err = dynamodb.NewDynamoDB(conf.DynamoDB)
case "datastore":
writer, err = datastore.NewDatastore(conf.Datastore)
case "elastic":
writer, err = elastic.NewElastic(conf.Elastic)
case "kafka":
writer, err = events.NewKafkaWriter(ctx, conf.Kafka)
case "pubsub":
writer, err = events.NewPubSubWriter(ctx, conf.PubSub)
case "mongodb":
writer, err = mongodb.NewMongoDB(conf.MongoDB)
default:
err = fmt.Errorf("unknown event writer: %s", e)
}
if err != nil {
return nil, fmt.Errorf("error occurred while initializing the %s event writer: %v", e, err)
}
}
// Construct a set of event writers based on the config.
builder := eventWriterBuilder{}
// If the task comes from a file or string,
// don't assume we should write to the database.
if opts.TaskFile == "" && opts.TaskBase64 == "" {
builder.Add(ctx, conf.Database, conf, log)
}
if writer != nil {
writers = append(writers, writer)
// Add configured event writers.
for _, e := range conf.EventWriters {
builder.Add(ctx, e, conf, log)
}
// Get the built writer.
writer, err := builder.Writer()
if err != nil {
return nil, fmt.Errorf("creating event writers: %v", err)
}

writer = &events.SystemLogFilter{Writer: &writers, Level: conf.Logger.Level}
// Wrap the event writers in a couple filters.
writer = &events.SystemLogFilter{Writer: writer, Level: conf.Logger.Level}
writer = &events.ErrLogger{Writer: writer, Log: log}

if opts.TaskFile != "" {
reader = &worker.FileTaskReader{Path: opts.TaskFile}
} else {
switch strings.ToLower(conf.Database) {
case "datastore":
db, err = datastore.NewDatastore(conf.Datastore)
case "dynamodb":
db, err = dynamodb.NewDynamoDB(conf.DynamoDB)
case "elastic":
db, err = elastic.NewElastic(conf.Elastic)
case "mongodb":
db, err = mongodb.NewMongoDB(conf.MongoDB)
case "boltdb":
reader, err = worker.NewRPCTaskReader(ctx, conf.RPCClient, opts.TaskID)
case "badger":
reader, err = worker.NewRPCTaskReader(ctx, conf.RPCClient, opts.TaskID)
default:
err = fmt.Errorf("unknown database: '%s'", conf.Database)
}
if err != nil {
return nil, fmt.Errorf("failed to instantiate database client: %v", err)
}
if reader == nil {
reader = worker.NewGenericTaskReader(db.GetTask, opts.TaskID)
}
// Get the task source reader: database, file, etc.
reader, err := newTaskReader(ctx, conf, opts)
if err != nil {
return nil, fmt.Errorf("creating task reader: %v", err)
}

// Initialize task storage client.
store, err := storage.NewMux(conf)
if err != nil {
return nil, fmt.Errorf("failed to instantiate Storage backend: %v", err)
}
store.AttachLogger(log)

w := &worker.DefaultWorker{
return &worker.DefaultWorker{
Conf: conf.Worker,
Store: store,
TaskReader: reader,
EventWriter: writer,
}, nil
}

// newTaskReader finds a TaskReader implementation that matches the config
// and commandline options.
func newTaskReader(ctx context.Context, conf config.Config, opts *Options) (worker.TaskReader, error) {

switch {
// These readers are used to read a local task from a file, cli arg, etc.
case opts.TaskFile != "":
return worker.NewFileTaskReader(opts.TaskFile)

case opts.TaskBase64 != "":
return worker.NewBase64TaskReader(opts.TaskBase64)
}

switch strings.ToLower(conf.Database) {
// These readers will connect to the configured task database.
case "datastore":
db, err := datastore.NewDatastore(conf.Datastore)
return newDatabaseTaskReader(opts.TaskID, db, err)

case "dynamodb":
db, err := dynamodb.NewDynamoDB(conf.DynamoDB)
return newDatabaseTaskReader(opts.TaskID, db, err)

case "elastic":
db, err := elastic.NewElastic(conf.Elastic)
return newDatabaseTaskReader(opts.TaskID, db, err)

case "mongodb":
db, err := mongodb.NewMongoDB(conf.MongoDB)
return newDatabaseTaskReader(opts.TaskID, db, err)

// These readers connect via RPC (because the database is embedded in the server).
case "boltdb", "badger":
return worker.NewRPCTaskReader(ctx, conf.RPCClient, opts.TaskID)

// No matching reader. Fail.
default:
return nil, fmt.Errorf("no matching task reader found")
}
}

return w, nil
// newDatabaseTaskReader helps create a generic task reader wrapper
// for the given database backend.
func newDatabaseTaskReader(taskID string, db tes.ReadOnlyServer, err error) (worker.TaskReader, error) {
if err != nil {
return nil, fmt.Errorf("creating database task reader: %v", err)
}
return worker.NewGenericTaskReader(db.GetTask, taskID), nil
}

// eventWriterBuilder is a helper for building a set of event writers,
// collecting errors, de-duplicating config, etc.
type eventWriterBuilder struct {
errors util.MultiError
writers events.MultiWriter
// seen tracks which event writers have already been built,
// so we don't build the same one twice.
seen map[string]bool
}

// Writers gets all the event writers and errors collected by multiple calls to Add().
func (e *eventWriterBuilder) Writer() (events.Writer, error) {
return &e.writers, e.errors.ToError()
}

// Add creates a new event writer by name and adds it to the builder.
func (e *eventWriterBuilder) Add(ctx context.Context, name string, conf config.Config, log *logger.Logger) {
if name == "" {
return
}

if e.seen == nil {
e.seen = map[string]bool{}
}

// If we've already created this event writer "name", skip it.
if _, ok := e.seen[name]; ok {
return
}
e.seen[name] = true

var err error
var writer events.Writer

switch name {
case "log":
writer = &events.Logger{Log: log}
case "boltdb", "badger":
writer, err = events.NewRPCWriter(ctx, conf.RPCClient)
case "dynamodb":
writer, err = dynamodb.NewDynamoDB(conf.DynamoDB)
case "datastore":
writer, err = datastore.NewDatastore(conf.Datastore)
case "elastic":
writer, err = elastic.NewElastic(conf.Elastic)
case "kafka":
writer, err = events.NewKafkaWriter(ctx, conf.Kafka)
case "pubsub":
writer, err = events.NewPubSubWriter(ctx, conf.PubSub)
case "mongodb":
writer, err = mongodb.NewMongoDB(conf.MongoDB)
default:
err = fmt.Errorf("unknown event writer: %s", name)
}

if err != nil {
e.errors = append(e.errors, err)
} else {
e.writers = append(e.writers, writer)
}
}

func validateConfig(conf config.Config, opts *Options) error {
// If the task reader is a file or string,
// only a subset of event writers are supported.
if opts.TaskFile != "" || opts.TaskBase64 != "" {
for _, e := range conf.EventWriters {
if e != "log" && e != "kafka" && e != "pubsub" {
return fmt.Errorf("event writer %q is not supported with a task file/string reader", e)
}
}
}
return nil
}
6 changes: 3 additions & 3 deletions cmd/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (

// Options holds a few CLI options for worker entrypoints.
type Options struct {
TaskID string
TaskFile string
TaskBase64 string
TaskID string
TaskFile string
TaskBase64 string
}

// NewCommand returns the worker command
Expand Down
2 changes: 1 addition & 1 deletion server/tes.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type TaskService struct {
// This is part of the TES implementation.
func (ts *TaskService) CreateTask(ctx context.Context, task *tes.Task) (*tes.CreateTaskResponse, error) {

if err := tes.InitTask(task); err != nil {
if err := tes.InitTask(task, true); err != nil {
return nil, grpc.Errorf(codes.InvalidArgument, err.Error())
}

Expand Down
18 changes: 14 additions & 4 deletions tes/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,20 @@ func GenerateID() string {
// such as Id, CreationTime, State, etc. If the task fails validation,
// an error is returned. See Validate().
// The given task is modified.
func InitTask(task *Task) error {
task.Id = GenerateID()
task.State = Queued
task.CreationTime = time.Now().Format(time.RFC3339Nano)
//
// If "overwrite" is true, the fields Id, State, and CreationTime
// will always be overwritten, even if already set, otherwise they
// will only be set if they are empty.
func InitTask(task *Task, overwrite bool) error {
if overwrite || task.Id == "" {
task.Id = GenerateID()
}
if overwrite || task.State == Unknown {
task.State = Queued
}
if overwrite || task.CreationTime == "" {
task.CreationTime = time.Now().Format(time.RFC3339Nano)
}
if err := Validate(task); err != nil {
return fmt.Errorf("invalid task message:\n%s", err)
}
Expand Down
16 changes: 11 additions & 5 deletions worker/base64_taskreader.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package worker

import (
"bytes"
"bytes"
"context"
"encoding/base64"
"encoding/base64"
"fmt"

"github.com/golang/protobuf/jsonpb"
Expand All @@ -15,18 +15,24 @@ type Base64TaskReader struct {
}

func NewBase64TaskReader(raw string) (*Base64TaskReader, error) {
data, err := base64.StdEncoding.DecodeString(raw)
data, err := base64.StdEncoding.DecodeString(raw)
if err != nil {
return nil, fmt.Errorf("decoding task: %v", err)
}

task := &tes.Task{}
buf := bytes.NewBuffer(data)
buf := bytes.NewBuffer(data)
err = jsonpb.Unmarshal(buf, task)
if err != nil {
return nil, fmt.Errorf("unmarshaling task: %v", err)
}
return &Base64TaskReader{task}, nil

err = tes.InitTask(task, false)
if err != nil {
return nil, fmt.Errorf("initializing task: %v", err)
}

return &Base64TaskReader{task}, nil
}

// Task returns the task. A random ID will be generated.
Expand Down
8 changes: 4 additions & 4 deletions worker/file_taskreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ func NewFileTaskReader(path string) (*FileTaskReader, error) {
return nil, fmt.Errorf("unmarshaling task file: %v", err)
}

err = tes.InitTask(task)
err = tes.InitTask(task, false)
if err != nil {
return nil, fmt.Errorf("validating task: %v", err)
return nil, fmt.Errorf("initializing task: %v", err)
}

return &FileTaskReader{task}, nil
return &FileTaskReader{task}, nil
}

// Task returns the task. A random ID will be generated.
Expand All @@ -44,5 +44,5 @@ func (f *FileTaskReader) Task(ctx context.Context) (*tes.Task, error) {
// of this reader, and since there is no online database to connect to,
// this will always return QUEUED.
func (f *FileTaskReader) State(ctx context.Context) (tes.State, error) {
return f.task.GetState(), nil
return f.task.GetState(), nil
}

0 comments on commit bb90914

Please sign in to comment.