Skip to content

Commit

Permalink
worker: fix tests for new factory interface
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Buchanan committed Jul 9, 2018
1 parent bb90914 commit bf72c62
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 28 deletions.
2 changes: 1 addition & 1 deletion cmd/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestPersistentPreRun(t *testing.T) {
defer cleanup()

c, h := newCommandHooks()
h.Run = func(ctx context.Context, conf config.Config, log *logger.Logger, taskID string) error {
h.Run = func(ctx context.Context, conf config.Config, log *logger.Logger, opts *Options) error {
if conf.Server.HostName != host {
t.Fatal("unexpected Server.HostName in config", conf.Server.HostName)
}
Expand Down
31 changes: 20 additions & 11 deletions tests/core/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,26 +1125,34 @@ func TestConcurrentStateUpdate(t *testing.T) {
f := tests.NewFunnel(c)
f.StartServer()

w, err := workerCmd.NewWorker(ctx, c, log)
if err != nil {
t.Fatal(err)
}
e := w.EventWriter

ids := []string{}
for i := 0; i < 10; i++ {
id := f.Run(`--sh 'echo hello'`)
ids = append(ids, id)

go func(id string) {
opts := &workerCmd.Options{TaskID: id}
w, err := workerCmd.NewWorker(ctx, c, log, opts)
if err != nil {
t.Fatal(err)
}

log.Info("writing state initializing event", "taskID", id)
err := e.WriteEvent(ctx, events.NewState(id, tes.Initializing))
err = w.EventWriter.WriteEvent(ctx, events.NewState(id, tes.Initializing))
if err != nil {
log.Error("error writing event", err)
}
}(id)

go func(id string) {
opts := &workerCmd.Options{TaskID: id}
w, err := workerCmd.NewWorker(ctx, c, log, opts)
if err != nil {
t.Fatal(err)
}

log.Info("writing state canceled event", "taskID", id)
err := e.WriteEvent(ctx, events.NewState(id, tes.Canceled))
err = w.EventWriter.WriteEvent(ctx, events.NewState(id, tes.Canceled))
if err != nil {
log.Error("error writing event", "error", err, "taskID", id)
}
Expand All @@ -1169,12 +1177,13 @@ func TestMetadataEvent(t *testing.T) {
f := tests.NewFunnel(c)
f.StartServer()

w, err := workerCmd.NewWorker(ctx, c, log)
id := f.Run(`--sh 'echo hello'`)

w, err := workerCmd.NewWorker(ctx, c, log, &workerCmd.Options{TaskID: id})
if err != nil {
t.Fatal(err)
}
e := w.EventWriter
id := f.Run(`--sh 'echo hello'`)

err = e.WriteEvent(ctx, events.NewMetadata(id, 0, map[string]string{"one": "two"}))
if err != nil {
Expand All @@ -1185,7 +1194,7 @@ func TestMetadataEvent(t *testing.T) {
t.Error("error writing event", "error", err, "taskID", id)
}

err = w.Run(ctx, id)
err = w.Run(ctx)
if err != nil {
t.Error("error running task", "error", err, "taskID", id)
}
Expand Down
20 changes: 10 additions & 10 deletions tests/core/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestWorkerRun(t *testing.T) {
`)

ctx := context.Background()
err := workerCmd.Run(ctx, c, log, id)
err := workerCmd.Run(ctx, c, log, &workerCmd.Options{TaskID: id})
if err != nil {
t.Fatal("unexpected error", err)
}
Expand Down Expand Up @@ -70,7 +70,7 @@ func TestWorkDirCleanup(t *testing.T) {
workdir := path.Join(c.Worker.WorkDir, id)

ctx := context.Background()
err := workerCmd.Run(ctx, c, log, id)
err := workerCmd.Run(ctx, c, log, &workerCmd.Options{TaskID: id})
if err != nil {
t.Fatal("unexpected error", err)
}
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestWorkDirCleanup(t *testing.T) {
c.Worker.LeaveWorkDir = true
workdir = path.Join(c.Worker.WorkDir, id)

err = workerCmd.Run(ctx, c, log, id)
err = workerCmd.Run(ctx, c, log, &workerCmd.Options{TaskID: id})
if err != nil {
t.Fatal("unexpected error", err)
}
Expand Down Expand Up @@ -141,11 +141,11 @@ type taskReader struct {
task *tes.Task
}

func (r taskReader) Task(ctx gcontext.Context, taskID string) (*tes.Task, error) {
func (r taskReader) Task(ctx gcontext.Context) (*tes.Task, error) {
return r.task, nil
}

func (r taskReader) State(ctx gcontext.Context, taskID string) (tes.State, error) {
func (r taskReader) State(ctx gcontext.Context) (tes.State, error) {
return r.task.State, nil
}

Expand Down Expand Up @@ -180,7 +180,7 @@ func TestLargeLogRate(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
w.Run(ctx, task.Id)
w.Run(ctx)

// Given the difficulty of timing how long it task a task + docker container to start,
// we just check that a small amount of events were generated.
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestZeroLogRate(t *testing.T) {
EventWriter: m,
}

w.Run(context.Background(), task.Id)
w.Run(context.Background())

time.Sleep(time.Second)

Expand Down Expand Up @@ -260,7 +260,7 @@ func TestZeroLogTailSize(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
w.Run(ctx, task.Id)
w.Run(ctx)

// we expect zero events to be generated
if counts.stdout != 0 {
Expand Down Expand Up @@ -295,7 +295,7 @@ func TestLogTailContent(t *testing.T) {
EventWriter: m,
}

err := w.Run(context.Background(), task.Id)
err := w.Run(context.Background())
if err != nil {
t.Error("unexpected worker.Run error", err)
}
Expand Down Expand Up @@ -339,7 +339,7 @@ func TestDockerContainerMetadata(t *testing.T) {
EventWriter: m,
}

err := w.Run(context.Background(), task.Id)
err := w.Run(context.Background())
if err != nil {
t.Error("unexpected worker.Run error", err)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/kafka/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestKafkaWorkerRun(t *testing.T) {
--sh 'echo hello world'
`)

err = workerCmd.Run(ctx, conf, log, id)
err = workerCmd.Run(ctx, conf, log, &workerCmd.Options{TaskID: id})
if err != nil {
t.Fatal("unexpected error", err)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestPubSubWorkerRun(t *testing.T) {
// compute backend is in use
id := fun.Run(`'echo hello world'`)

err := workerCmd.Run(ctx, conf, nil, id)
err := workerCmd.Run(ctx, conf, nil, &workerCmd.Options{TaskID: id})
if err != nil {
t.Fatal("unexpected error", err)
}
Expand Down
13 changes: 9 additions & 4 deletions tests/scheduler/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,16 @@ func TestManualBackend(t *testing.T) {
srv.Conf.Node.ID = "test-node-manual"
// create a node
srv.Conf.Node.ID = "test-node-manual"
w, err := workercmd.NewWorker(ctx, conf, log)
if err != nil {
t.Fatal("failed to create worker factory", err)

factory := func(ctx context.Context, taskID string) error {
w, err := workercmd.NewWorker(ctx, conf, log, &workercmd.Options{TaskID: taskID})
if err != nil {
return err
}
return w.Run(ctx)
}
n, err := scheduler.NewNodeProcess(ctx, srv.Conf, w.Run, log)

n, err := scheduler.NewNodeProcess(ctx, srv.Conf, factory, log)
if err != nil {
t.Fatal("failed to create node", err)
}
Expand Down
2 changes: 2 additions & 0 deletions worker/base64_taskreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"github.com/ohsu-comp-bio/funnel/tes"
)

// Base64TaskReader reads a task from a base64 encoded string.
type Base64TaskReader struct {
task *tes.Task
}

// NewBase64TaskReader creates a new Base64TaskReader.
func NewBase64TaskReader(raw string) (*Base64TaskReader, error) {
data, err := base64.StdEncoding.DecodeString(raw)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions worker/file_taskreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type FileTaskReader struct {
task *tes.Task
}

// NewFileTaskReader creates a new FileTaskReader.
func NewFileTaskReader(path string) (*FileTaskReader, error) {
fh, err := os.Open(path)
if err != nil {
Expand Down

0 comments on commit bf72c62

Please sign in to comment.