Skip to content

Commit 36dadc2

Browse files
authored
Merge pull request #29 from ccremer/context
Support pipeline cancellation with Go's native Context
2 parents 503de6f + 9fa005b commit 36dadc2

22 files changed

+492
-260
lines changed

README.md

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,35 +11,39 @@ Small Go utility that executes business actions in a pipeline.
1111

1212
```go
1313
import (
14+
"context"
1415
pipeline "github.com/ccremer/go-command-pipeline"
1516
"github.com/ccremer/go-command-pipeline/predicate"
1617
)
1718

19+
type Data struct {
20+
Number int
21+
}
22+
1823
func main() {
19-
number := 0 // define arbitrary data to pass around in the steps.
24+
data := &Data // define arbitrary data to pass around in the steps.
2025
p := pipeline.NewPipeline()
21-
p.WithContext(&number)
2226
p.WithSteps(
2327
pipeline.NewStep("define random number", defineNumber),
2428
pipeline.NewStepFromFunc("print number", printNumber),
2529
)
26-
result := p.Run()
30+
result := p.RunWithContext(context.WithValue(context.Background, "data", data))
2731
if !result.IsSuccessful() {
2832
log.Fatal(result.Err)
2933
}
3034
}
3135

32-
func defineNumber(ctx pipeline.Context) pipeline.Result {
33-
ctx.(*int) = 10
36+
func defineNumber(ctx context.Context) pipeline.Result {
37+
ctx.Value("data").(*Data).Number = 10
3438
return pipeline.Result{}
3539
}
3640

3741
// Let's assume this is a business function that can fail.
3842
// You can enable "automatic" fail-on-first-error pipelines by having more small functions that return errors.
39-
func printNumber(ctx pipeline.Context) error {
40-
number := ctx.(*int)
41-
_, err := fmt.Println(*number)
42-
return err
43+
func printNumber(ctx context.Context) error {
44+
number := ctx.Value("data").(*Data).Number
45+
fmt.Println(number)
46+
return nil
4347
}
4448
```
4549

@@ -70,18 +74,18 @@ We have tons of `if err != nil` that bloats the function with more error handlin
7074

7175
It could be simplified to something like this:
7276
```go
73-
func Persist(data Data) error {
74-
p := pipeline.NewPipeline().WithContext(data).WithSteps(
77+
func Persist(data *Data) error {
78+
p := pipeline.NewPipeline().WithSteps(
7579
pipeline.NewStep("prepareTransaction", prepareTransaction()),
7680
pipeline.NewStep("executeQuery", executeQuery()),
7781
pipeline.NewStep("commitTransaction", commit()),
7882
)
79-
return p.Run().Err
83+
return p.RunWithContext(context.WithValue(context.Background(), myKey, data).Err
8084
}
8185

8286
func executeQuery() pipeline.ActionFunc {
83-
return func(ctx pipeline.Context) pipeline.Result {
84-
data := ctx.(Data)
87+
return func(ctx context.Context) pipeline.Result {
88+
data := ctx.Value(myKey).(*Data)
8589
err := database.executeQuery("SOME QUERY", data)
8690
return pipeline.Result{Err: err}
8791
)

examples/abort_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package examples
55

66
import (
7+
"context"
78
"errors"
89
"testing"
910

@@ -22,11 +23,11 @@ func TestExample_Abort(t *testing.T) {
2223
assert.True(t, result.IsAborted())
2324
}
2425

25-
func doNotExecute(_ pipeline.Context) error {
26+
func doNotExecute(_ context.Context) error {
2627
return errors.New("should not execute")
2728
}
2829

29-
func abort(_ pipeline.Context) error {
30+
func abort(_ context.Context) error {
3031
// some logic that can handle errors, but you don't want to bubble up the error.
3132

3233
// terminate pipeline gracefully

examples/context_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package examples
55

66
import (
7+
"context"
78
"fmt"
89
"math/rand"
910
"testing"
@@ -15,26 +16,27 @@ type Data struct {
1516
Number int
1617
}
1718

19+
var key = struct{}{}
20+
1821
func TestExample_Context(t *testing.T) {
1922
// Create pipeline with defaults
2023
p := pipeline.NewPipeline()
21-
p.WithContext(&Data{})
2224
p.WithSteps(
2325
pipeline.NewStep("define random number", defineNumber),
2426
pipeline.NewStepFromFunc("print number", printNumber),
2527
)
26-
result := p.Run()
28+
result := p.RunWithContext(context.WithValue(context.Background(), key, &Data{}))
2729
if !result.IsSuccessful() {
2830
t.Fatal(result.Err)
2931
}
3032
}
3133

32-
func defineNumber(ctx pipeline.Context) pipeline.Result {
33-
ctx.(*Data).Number = rand.Int()
34+
func defineNumber(ctx context.Context) pipeline.Result {
35+
ctx.Value(key).(*Data).Number = rand.Int()
3436
return pipeline.Result{}
3537
}
3638

37-
func printNumber(ctx pipeline.Context) error {
38-
_, err := fmt.Println(ctx.(*Data).Number)
39+
func printNumber(ctx context.Context) error {
40+
_, err := fmt.Println(ctx.Value(key).(*Data).Number)
3941
return err
4042
}

examples/git_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package examples
55

66
import (
7+
"context"
78
"log"
89
"os"
910
"os/exec"
@@ -26,27 +27,27 @@ func TestExample_Git(t *testing.T) {
2627
}
2728
}
2829

29-
func logSuccess(ctx pipeline.Context, result pipeline.Result) error {
30+
func logSuccess(_ context.Context, result pipeline.Result) error {
3031
log.Println("handler called")
3132
return result.Err
3233
}
3334

3435
func CloneGitRepository() pipeline.ActionFunc {
35-
return func(_ pipeline.Context) pipeline.Result {
36+
return func(_ context.Context) pipeline.Result {
3637
err := execGitCommand("clone", "git@github.com/ccremer/go-command-pipeline")
3738
return pipeline.Result{Err: err}
3839
}
3940
}
4041

4142
func Pull() pipeline.ActionFunc {
42-
return func(_ pipeline.Context) pipeline.Result {
43+
return func(_ context.Context) pipeline.Result {
4344
err := execGitCommand("pull")
4445
return pipeline.Result{Err: err}
4546
}
4647
}
4748

4849
func CheckoutBranch() pipeline.ActionFunc {
49-
return func(_ pipeline.Context) pipeline.Result {
50+
return func(_ context.Context) pipeline.Result {
5051
err := execGitCommand("checkout", "master")
5152
return pipeline.Result{Err: err}
5253
}
@@ -61,7 +62,7 @@ func execGitCommand(args ...string) error {
6162
}
6263

6364
func DirExists(path string) predicate.Predicate {
64-
return func(_ pipeline.Context) bool {
65+
return func(_ context.Context) bool {
6566
if info, err := os.Stat(path); err != nil || !info.IsDir() {
6667
return false
6768
}

examples/hooks_test.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package examples
55

66
import (
7+
"context"
78
"fmt"
89
"testing"
910

@@ -16,17 +17,15 @@ func TestExample_Hooks(t *testing.T) {
1617
fmt.Println(fmt.Sprintf("Executing step: %s", step.Name))
1718
})
1819
p.WithSteps(
19-
pipeline.NewStep("hook demo", AfterHookAction()),
20+
pipeline.NewStepFromFunc("hook demo", AfterHookAction),
2021
)
2122
result := p.Run()
2223
if !result.IsSuccessful() {
2324
t.Fatal(result.Err)
2425
}
2526
}
2627

27-
func AfterHookAction() pipeline.ActionFunc {
28-
return func(ctx pipeline.Context) pipeline.Result {
29-
fmt.Println("I am called in an action after the hooks")
30-
return pipeline.Result{}
31-
}
28+
func AfterHookAction(_ context.Context) error {
29+
fmt.Println("I am called in an action after the hooks")
30+
return nil
3231
}

go.mod

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ module github.com/ccremer/go-command-pipeline
22

33
go 1.17
44

5-
require github.com/stretchr/testify v1.7.0
5+
require (
6+
github.com/stretchr/testify v1.7.0
7+
go.uber.org/goleak v1.1.12
8+
)
69

710
require (
811
github.com/davecgh/go-spew v1.1.0 // indirect

go.sum

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,47 @@
11
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
22
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3+
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
4+
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
5+
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
6+
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
7+
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
38
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
49
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
510
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
611
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
712
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
8-
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
13+
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
14+
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
15+
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
16+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
17+
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
18+
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
19+
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
20+
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
21+
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
22+
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
23+
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
24+
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
25+
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
26+
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
27+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
28+
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
29+
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
30+
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
31+
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
32+
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
33+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
34+
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
35+
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
36+
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
37+
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
38+
golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
39+
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
40+
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
41+
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
42+
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
943
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
44+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
45+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
1046
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
1147
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

options_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package pipeline
22

33
import (
4+
"context"
45
"errors"
56
"testing"
67

@@ -11,7 +12,7 @@ func TestPipeline_WithOptions(t *testing.T) {
1112
t.Run("DisableErrorWrapping", func(t *testing.T) {
1213
p := NewPipeline().WithOptions(DisableErrorWrapping)
1314
p.WithSteps(
14-
NewStepFromFunc("disabled error wrapping", func(_ Context) error {
15+
NewStepFromFunc("disabled error wrapping", func(_ context.Context) error {
1516
return errors.New("some error")
1617
}),
1718
)

parallel/fanout.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package parallel
22

33
import (
4+
"context"
45
"sync"
56

67
pipeline "github.com/ccremer/go-command-pipeline"
@@ -11,31 +12,33 @@ NewFanOutStep creates a pipeline step that runs nested pipelines in their own Go
1112
The function provided as PipelineSupplier is expected to close the given channel when no more pipelines should be executed, otherwise this step blocks forever.
1213
The step waits until all pipelines are finished.
1314
If the given ResultHandler is non-nil it will be called after all pipelines were run, otherwise the step is considered successful.
14-
The given pipelines have to define their own pipeline.Context, it's not passed "down" from parent pipeline.
15-
However, The pipeline.Context for the ResultHandler will be the one from parent pipeline.
15+
16+
If the context is canceled, no new pipelines will be retrieved from the channel and the PipelineSupplier is expected to stop supplying new instances.
17+
Also, once canceled, the step waits for the remaining children pipelines and collects their result via given ResultHandler.
18+
However, the error returned from ResultHandler is wrapped in context.Canceled.
1619
*/
1720
func NewFanOutStep(name string, pipelineSupplier PipelineSupplier, handler ResultHandler) pipeline.Step {
1821
step := pipeline.Step{Name: name}
19-
step.F = func(ctx pipeline.Context) pipeline.Result {
22+
step.F = func(ctx context.Context) pipeline.Result {
2023
pipelineChan := make(chan *pipeline.Pipeline)
2124
m := sync.Map{}
2225
var wg sync.WaitGroup
2326
i := uint64(0)
2427

25-
go pipelineSupplier(pipelineChan)
28+
go pipelineSupplier(ctx, pipelineChan)
2629
for pipe := range pipelineChan {
2730
p := pipe
2831
wg.Add(1)
2932
n := i
3033
i++
3134
go func() {
3235
defer wg.Done()
33-
m.Store(n, p.Run())
36+
m.Store(n, p.RunWithContext(ctx))
3437
}()
3538
}
36-
3739
wg.Wait()
38-
return collectResults(ctx, handler, &m)
40+
res := collectResults(ctx, handler, &m)
41+
return setResultErrorFromContext(ctx, res)
3942
}
4043
return step
4144
}

0 commit comments

Comments
 (0)