Skip to content

Commit 198c1c4

Browse files
committed
Use Go's context to pass around
1 parent 503de6f commit 198c1c4

File tree

16 files changed

+150
-163
lines changed

16 files changed

+150
-163
lines changed

examples/abort_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package examples
55

66
import (
7+
"context"
78
"errors"
89
"testing"
910

@@ -22,11 +23,11 @@ func TestExample_Abort(t *testing.T) {
2223
assert.True(t, result.IsAborted())
2324
}
2425

25-
func doNotExecute(_ pipeline.Context) error {
26+
func doNotExecute(_ context.Context) error {
2627
return errors.New("should not execute")
2728
}
2829

29-
func abort(_ pipeline.Context) error {
30+
func abort(_ context.Context) error {
3031
// some logic that can handle errors, but you don't want to bubble up the error.
3132

3233
// terminate pipeline gracefully

examples/context_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package examples
55

66
import (
7+
"context"
78
"fmt"
89
"math/rand"
910
"testing"
@@ -18,7 +19,7 @@ type Data struct {
1819
func TestExample_Context(t *testing.T) {
1920
// Create pipeline with defaults
2021
p := pipeline.NewPipeline()
21-
p.WithContext(&Data{})
22+
p.WithContext(context.WithValue(context.Background(), "data", &Data{}))
2223
p.WithSteps(
2324
pipeline.NewStep("define random number", defineNumber),
2425
pipeline.NewStepFromFunc("print number", printNumber),
@@ -29,12 +30,12 @@ func TestExample_Context(t *testing.T) {
2930
}
3031
}
3132

32-
func defineNumber(ctx pipeline.Context) pipeline.Result {
33-
ctx.(*Data).Number = rand.Int()
33+
func defineNumber(ctx context.Context) pipeline.Result {
34+
ctx.Value("data").(*Data).Number = rand.Int()
3435
return pipeline.Result{}
3536
}
3637

37-
func printNumber(ctx pipeline.Context) error {
38-
_, err := fmt.Println(ctx.(*Data).Number)
38+
func printNumber(ctx context.Context) error {
39+
_, err := fmt.Println(ctx.Value("data").(*Data).Number)
3940
return err
4041
}

examples/git_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package examples
55

66
import (
7+
"context"
78
"log"
89
"os"
910
"os/exec"
@@ -26,27 +27,27 @@ func TestExample_Git(t *testing.T) {
2627
}
2728
}
2829

29-
func logSuccess(ctx pipeline.Context, result pipeline.Result) error {
30+
func logSuccess(_ context.Context, result pipeline.Result) error {
3031
log.Println("handler called")
3132
return result.Err
3233
}
3334

3435
func CloneGitRepository() pipeline.ActionFunc {
35-
return func(_ pipeline.Context) pipeline.Result {
36+
return func(_ context.Context) pipeline.Result {
3637
err := execGitCommand("clone", "git@github.com/ccremer/go-command-pipeline")
3738
return pipeline.Result{Err: err}
3839
}
3940
}
4041

4142
func Pull() pipeline.ActionFunc {
42-
return func(_ pipeline.Context) pipeline.Result {
43+
return func(_ context.Context) pipeline.Result {
4344
err := execGitCommand("pull")
4445
return pipeline.Result{Err: err}
4546
}
4647
}
4748

4849
func CheckoutBranch() pipeline.ActionFunc {
49-
return func(_ pipeline.Context) pipeline.Result {
50+
return func(_ context.Context) pipeline.Result {
5051
err := execGitCommand("checkout", "master")
5152
return pipeline.Result{Err: err}
5253
}
@@ -61,7 +62,7 @@ func execGitCommand(args ...string) error {
6162
}
6263

6364
func DirExists(path string) predicate.Predicate {
64-
return func(_ pipeline.Context) bool {
65+
return func(_ context.Context) bool {
6566
if info, err := os.Stat(path); err != nil || !info.IsDir() {
6667
return false
6768
}

examples/hooks_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package examples
55

66
import (
7+
"context"
78
"fmt"
89
"testing"
910

@@ -25,7 +26,7 @@ func TestExample_Hooks(t *testing.T) {
2526
}
2627

2728
func AfterHookAction() pipeline.ActionFunc {
28-
return func(ctx pipeline.Context) pipeline.Result {
29+
return func(ctx context.Context) pipeline.Result {
2930
fmt.Println("I am called in an action after the hooks")
3031
return pipeline.Result{}
3132
}

options_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package pipeline
22

33
import (
4+
"context"
45
"errors"
56
"testing"
67

@@ -11,7 +12,7 @@ func TestPipeline_WithOptions(t *testing.T) {
1112
t.Run("DisableErrorWrapping", func(t *testing.T) {
1213
p := NewPipeline().WithOptions(DisableErrorWrapping)
1314
p.WithSteps(
14-
NewStepFromFunc("disabled error wrapping", func(_ Context) error {
15+
NewStepFromFunc("disabled error wrapping", func(_ context.Context) error {
1516
return errors.New("some error")
1617
}),
1718
)

parallel/fanout.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package parallel
22

33
import (
4+
"context"
45
"sync"
56

67
pipeline "github.com/ccremer/go-command-pipeline"
@@ -11,12 +12,12 @@ NewFanOutStep creates a pipeline step that runs nested pipelines in their own Go
1112
The function provided as PipelineSupplier is expected to close the given channel when no more pipelines should be executed, otherwise this step blocks forever.
1213
The step waits until all pipelines are finished.
1314
If the given ResultHandler is non-nil it will be called after all pipelines were run, otherwise the step is considered successful.
14-
The given pipelines have to define their own pipeline.Context, it's not passed "down" from parent pipeline.
15-
However, The pipeline.Context for the ResultHandler will be the one from parent pipeline.
15+
The given pipelines have to define their own context.Context, it's not passed "down" from parent pipeline.
16+
However, The context.Context for the ResultHandler will be the one from parent pipeline.
1617
*/
1718
func NewFanOutStep(name string, pipelineSupplier PipelineSupplier, handler ResultHandler) pipeline.Step {
1819
step := pipeline.Step{Name: name}
19-
step.F = func(ctx pipeline.Context) pipeline.Result {
20+
step.F = func(ctx context.Context) pipeline.Result {
2021
pipelineChan := make(chan *pipeline.Pipeline)
2122
m := sync.Map{}
2223
var wg sync.WaitGroup

parallel/fanout_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package parallel
22

33
import (
4+
"context"
45
"fmt"
56
"sync/atomic"
67
"testing"
@@ -29,7 +30,7 @@ func TestNewFanOutStep(t *testing.T) {
2930
"GivenPipelineWith_WhenRunningStep_ThenReturnSuccessButRunErrorHandler": {
3031
jobs: 1,
3132
returnErr: fmt.Errorf("should be called"),
32-
givenResultHandler: func(ctx pipeline.Context, _ map[uint64]pipeline.Result) pipeline.Result {
33+
givenResultHandler: func(ctx context.Context, _ map[uint64]pipeline.Result) pipeline.Result {
3334
atomic.AddUint64(&counts, 1)
3435
return pipeline.Result{}
3536
},
@@ -41,15 +42,15 @@ func TestNewFanOutStep(t *testing.T) {
4142
t.Run(name, func(t *testing.T) {
4243
handler := tt.givenResultHandler
4344
if handler == nil {
44-
handler = func(ctx pipeline.Context, results map[uint64]pipeline.Result) pipeline.Result {
45+
handler = func(ctx context.Context, results map[uint64]pipeline.Result) pipeline.Result {
4546
assert.NoError(t, results[0].Err)
4647
return pipeline.Result{}
4748
}
4849
}
4950
step := NewFanOutStep("fanout", func(funcs chan *pipeline.Pipeline) {
5051
defer close(funcs)
5152
for i := 0; i < tt.jobs; i++ {
52-
funcs <- pipeline.NewPipeline().WithSteps(pipeline.NewStep("step", func(_ pipeline.Context) pipeline.Result {
53+
funcs <- pipeline.NewPipeline().WithSteps(pipeline.NewStep("step", func(_ context.Context) pipeline.Result {
5354
atomic.AddUint64(&counts, 1)
5455
return pipeline.Result{Err: tt.returnErr}
5556
}))
@@ -69,13 +70,13 @@ func ExampleNewFanOutStep() {
6970
// create some pipelines
7071
for i := 0; i < 3; i++ {
7172
n := i
72-
pipelines <- pipeline.NewPipeline().AddStep(pipeline.NewStep(fmt.Sprintf("i = %d", n), func(_ pipeline.Context) pipeline.Result {
73+
pipelines <- pipeline.NewPipeline().AddStep(pipeline.NewStep(fmt.Sprintf("i = %d", n), func(_ context.Context) pipeline.Result {
7374
time.Sleep(time.Duration(n * 10000000)) // fake some load
7475
fmt.Println(fmt.Sprintf("I am worker %d", n))
7576
return pipeline.Result{}
7677
}))
7778
}
78-
}, func(ctx pipeline.Context, results map[uint64]pipeline.Result) pipeline.Result {
79+
}, func(ctx context.Context, results map[uint64]pipeline.Result) pipeline.Result {
7980
for worker, result := range results {
8081
if result.IsFailed() {
8182
fmt.Println(fmt.Sprintf("Worker %d failed: %v", worker, result.Err))

parallel/pool.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package parallel
22

33
import (
4+
"context"
45
"sync"
56
"sync/atomic"
67

@@ -15,15 +16,15 @@ The step waits until all pipelines are finished.
1516
* The pipelines are executed in a pool of a number of Go routines indicated by size.
1617
* If size is 1, the pipelines are effectively run in sequence.
1718
* If size is 0 or less, the function panics.
18-
The given pipelines have to define their own pipeline.Context, it's not passed "down" from parent pipeline.
19-
However, The pipeline.Context for the ResultHandler will be the one from parent pipeline.
19+
The given pipelines have to define their own context.Context, it's not passed "down" from parent pipeline.
20+
However, The context.Context for the ResultHandler will be the one from parent pipeline.
2021
*/
2122
func NewWorkerPoolStep(name string, size int, pipelineSupplier PipelineSupplier, handler ResultHandler) pipeline.Step {
2223
if size < 1 {
2324
panic("pool size cannot be lower than 1")
2425
}
2526
step := pipeline.Step{Name: name}
26-
step.F = func(ctx pipeline.Context) pipeline.Result {
27+
step.F = func(ctx context.Context) pipeline.Result {
2728
pipelineChan := make(chan *pipeline.Pipeline, size)
2829
m := sync.Map{}
2930
var wg sync.WaitGroup

parallel/pool_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package parallel
22

33
import (
4+
"context"
45
"errors"
56
"fmt"
67
"sync/atomic"
@@ -37,11 +38,11 @@ func TestNewWorkerPoolStep(t *testing.T) {
3738
}
3839
step := NewWorkerPoolStep("pool", 1, func(pipelines chan *pipeline.Pipeline) {
3940
defer close(pipelines)
40-
pipelines <- pipeline.NewPipeline().AddStep(pipeline.NewStep("step", func(_ pipeline.Context) pipeline.Result {
41+
pipelines <- pipeline.NewPipeline().AddStep(pipeline.NewStep("step", func(_ context.Context) pipeline.Result {
4142
atomic.AddUint64(&counts, 1)
4243
return pipeline.Result{Err: tt.expectedError}
4344
}))
44-
}, func(ctx pipeline.Context, results map[uint64]pipeline.Result) pipeline.Result {
45+
}, func(ctx context.Context, results map[uint64]pipeline.Result) pipeline.Result {
4546
assert.Error(t, results[0].Err)
4647
return pipeline.Result{Err: results[0].Err}
4748
})
@@ -58,13 +59,13 @@ func ExampleNewWorkerPoolStep() {
5859
// create some pipelines
5960
for i := 0; i < 3; i++ {
6061
n := i
61-
pipelines <- pipeline.NewPipeline().AddStep(pipeline.NewStep(fmt.Sprintf("i = %d", n), func(_ pipeline.Context) pipeline.Result {
62+
pipelines <- pipeline.NewPipeline().AddStep(pipeline.NewStep(fmt.Sprintf("i = %d", n), func(_ context.Context) pipeline.Result {
6263
time.Sleep(time.Duration(n * 100000000)) // fake some load
6364
fmt.Println(fmt.Sprintf("This is job item %d", n))
6465
return pipeline.Result{}
6566
}))
6667
}
67-
}, func(ctx pipeline.Context, results map[uint64]pipeline.Result) pipeline.Result {
68+
}, func(ctx context.Context, results map[uint64]pipeline.Result) pipeline.Result {
6869
for jobIndex, result := range results {
6970
if result.IsFailed() {
7071
fmt.Println(fmt.Sprintf("Job %d failed: %v", jobIndex, result.Err))

parallel/types.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ Package parallel extends the command-pipeline core with concurrency steps.
44
package parallel
55

66
import (
7+
"context"
78
"sync"
89

910
pipeline "github.com/ccremer/go-command-pipeline"
@@ -12,15 +13,14 @@ import (
1213
type (
1314
// ResultHandler is a callback that provides a result map and expect a single, combined pipeline.Result object.
1415
// The map key is a zero-based index of n-th pipeline.Pipeline spawned, e.g. pipeline number 3 will have index 2.
15-
// Context may be nil.
1616
// Return an empty pipeline.Result if you want to ignore errors, or reduce multiple errors into a single one to make the parent pipeline fail.
17-
ResultHandler func(ctx pipeline.Context, results map[uint64]pipeline.Result) pipeline.Result
17+
ResultHandler func(ctx context.Context, results map[uint64]pipeline.Result) pipeline.Result
1818
// PipelineSupplier is a function that spawns pipeline.Pipeline for consumption.
1919
// The function must close the channel once all pipelines are spawned (`defer close()` recommended).
2020
PipelineSupplier func(chan *pipeline.Pipeline)
2121
)
2222

23-
func collectResults(ctx pipeline.Context, handler ResultHandler, m *sync.Map) pipeline.Result {
23+
func collectResults(ctx context.Context, handler ResultHandler, m *sync.Map) pipeline.Result {
2424
collectiveResult := pipeline.Result{}
2525
if handler != nil {
2626
// convert sync.Map to conventional map for easier access

0 commit comments

Comments
 (0)