Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,39 @@ Small Go utility that executes business actions in a pipeline.

```go
import (
"context"
pipeline "github.com/ccremer/go-command-pipeline"
"github.com/ccremer/go-command-pipeline/predicate"
)

type Data struct {
Number int
}

func main() {
number := 0 // define arbitrary data to pass around in the steps.
data := &Data // define arbitrary data to pass around in the steps.
p := pipeline.NewPipeline()
p.WithContext(&number)
p.WithSteps(
pipeline.NewStep("define random number", defineNumber),
pipeline.NewStepFromFunc("print number", printNumber),
)
result := p.Run()
result := p.RunWithContext(context.WithValue(context.Background, "data", data))
if !result.IsSuccessful() {
log.Fatal(result.Err)
}
}

func defineNumber(ctx pipeline.Context) pipeline.Result {
ctx.(*int) = 10
func defineNumber(ctx context.Context) pipeline.Result {
ctx.Value("data").(*Data).Number = 10
return pipeline.Result{}
}

// Let's assume this is a business function that can fail.
// You can enable "automatic" fail-on-first-error pipelines by having more small functions that return errors.
func printNumber(ctx pipeline.Context) error {
number := ctx.(*int)
_, err := fmt.Println(*number)
return err
func printNumber(ctx context.Context) error {
number := ctx.Value("data").(*Data).Number
fmt.Println(number)
return nil
}
```

Expand Down Expand Up @@ -70,18 +74,18 @@ We have tons of `if err != nil` that bloats the function with more error handlin

It could be simplified to something like this:
```go
func Persist(data Data) error {
p := pipeline.NewPipeline().WithContext(data).WithSteps(
func Persist(data *Data) error {
p := pipeline.NewPipeline().WithSteps(
pipeline.NewStep("prepareTransaction", prepareTransaction()),
pipeline.NewStep("executeQuery", executeQuery()),
pipeline.NewStep("commitTransaction", commit()),
)
return p.Run().Err
return p.RunWithContext(context.WithValue(context.Background(), myKey, data).Err
}

func executeQuery() pipeline.ActionFunc {
return func(ctx pipeline.Context) pipeline.Result {
data := ctx.(Data)
return func(ctx context.Context) pipeline.Result {
data := ctx.Value(myKey).(*Data)
err := database.executeQuery("SOME QUERY", data)
return pipeline.Result{Err: err}
)
Expand Down
5 changes: 3 additions & 2 deletions examples/abort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package examples

import (
"context"
"errors"
"testing"

Expand All @@ -22,11 +23,11 @@ func TestExample_Abort(t *testing.T) {
assert.True(t, result.IsAborted())
}

func doNotExecute(_ pipeline.Context) error {
func doNotExecute(_ context.Context) error {
return errors.New("should not execute")
}

func abort(_ pipeline.Context) error {
func abort(_ context.Context) error {
// some logic that can handle errors, but you don't want to bubble up the error.

// terminate pipeline gracefully
Expand Down
14 changes: 8 additions & 6 deletions examples/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package examples

import (
"context"
"fmt"
"math/rand"
"testing"
Expand All @@ -15,26 +16,27 @@ type Data struct {
Number int
}

var key = struct{}{}

func TestExample_Context(t *testing.T) {
// Create pipeline with defaults
p := pipeline.NewPipeline()
p.WithContext(&Data{})
p.WithSteps(
pipeline.NewStep("define random number", defineNumber),
pipeline.NewStepFromFunc("print number", printNumber),
)
result := p.Run()
result := p.RunWithContext(context.WithValue(context.Background(), key, &Data{}))
if !result.IsSuccessful() {
t.Fatal(result.Err)
}
}

func defineNumber(ctx pipeline.Context) pipeline.Result {
ctx.(*Data).Number = rand.Int()
func defineNumber(ctx context.Context) pipeline.Result {
ctx.Value(key).(*Data).Number = rand.Int()
return pipeline.Result{}
}

func printNumber(ctx pipeline.Context) error {
_, err := fmt.Println(ctx.(*Data).Number)
func printNumber(ctx context.Context) error {
_, err := fmt.Println(ctx.Value(key).(*Data).Number)
return err
}
11 changes: 6 additions & 5 deletions examples/git_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package examples

import (
"context"
"log"
"os"
"os/exec"
Expand All @@ -26,27 +27,27 @@ func TestExample_Git(t *testing.T) {
}
}

func logSuccess(ctx pipeline.Context, result pipeline.Result) error {
func logSuccess(_ context.Context, result pipeline.Result) error {
log.Println("handler called")
return result.Err
}

func CloneGitRepository() pipeline.ActionFunc {
return func(_ pipeline.Context) pipeline.Result {
return func(_ context.Context) pipeline.Result {
err := execGitCommand("clone", "git@github.com/ccremer/go-command-pipeline")
return pipeline.Result{Err: err}
}
}

func Pull() pipeline.ActionFunc {
return func(_ pipeline.Context) pipeline.Result {
return func(_ context.Context) pipeline.Result {
err := execGitCommand("pull")
return pipeline.Result{Err: err}
}
}

func CheckoutBranch() pipeline.ActionFunc {
return func(_ pipeline.Context) pipeline.Result {
return func(_ context.Context) pipeline.Result {
err := execGitCommand("checkout", "master")
return pipeline.Result{Err: err}
}
Expand All @@ -61,7 +62,7 @@ func execGitCommand(args ...string) error {
}

func DirExists(path string) predicate.Predicate {
return func(_ pipeline.Context) bool {
return func(_ context.Context) bool {
if info, err := os.Stat(path); err != nil || !info.IsDir() {
return false
}
Expand Down
11 changes: 5 additions & 6 deletions examples/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package examples

import (
"context"
"fmt"
"testing"

Expand All @@ -16,17 +17,15 @@ func TestExample_Hooks(t *testing.T) {
fmt.Println(fmt.Sprintf("Executing step: %s", step.Name))
})
p.WithSteps(
pipeline.NewStep("hook demo", AfterHookAction()),
pipeline.NewStepFromFunc("hook demo", AfterHookAction),
)
result := p.Run()
if !result.IsSuccessful() {
t.Fatal(result.Err)
}
}

func AfterHookAction() pipeline.ActionFunc {
return func(ctx pipeline.Context) pipeline.Result {
fmt.Println("I am called in an action after the hooks")
return pipeline.Result{}
}
func AfterHookAction(_ context.Context) error {
fmt.Println("I am called in an action after the hooks")
return nil
}
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ module github.com/ccremer/go-command-pipeline

go 1.17

require github.com/stretchr/testify v1.7.0
require (
github.com/stretchr/testify v1.7.0
go.uber.org/goleak v1.1.12
)

require (
github.com/davecgh/go-spew v1.1.0 // indirect
Expand Down
38 changes: 37 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
@@ -1,11 +1,47 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
3 changes: 2 additions & 1 deletion options_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pipeline

import (
"context"
"errors"
"testing"

Expand All @@ -11,7 +12,7 @@ func TestPipeline_WithOptions(t *testing.T) {
t.Run("DisableErrorWrapping", func(t *testing.T) {
p := NewPipeline().WithOptions(DisableErrorWrapping)
p.WithSteps(
NewStepFromFunc("disabled error wrapping", func(_ Context) error {
NewStepFromFunc("disabled error wrapping", func(_ context.Context) error {
return errors.New("some error")
}),
)
Expand Down
17 changes: 10 additions & 7 deletions parallel/fanout.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package parallel

import (
"context"
"sync"

pipeline "github.com/ccremer/go-command-pipeline"
Expand All @@ -11,31 +12,33 @@ NewFanOutStep creates a pipeline step that runs nested pipelines in their own Go
The function provided as PipelineSupplier is expected to close the given channel when no more pipelines should be executed, otherwise this step blocks forever.
The step waits until all pipelines are finished.
If the given ResultHandler is non-nil it will be called after all pipelines were run, otherwise the step is considered successful.
The given pipelines have to define their own pipeline.Context, it's not passed "down" from parent pipeline.
However, The pipeline.Context for the ResultHandler will be the one from parent pipeline.

If the context is canceled, no new pipelines will be retrieved from the channel and the PipelineSupplier is expected to stop supplying new instances.
Also, once canceled, the step waits for the remaining children pipelines and collects their result via given ResultHandler.
However, the error returned from ResultHandler is wrapped in context.Canceled.
*/
func NewFanOutStep(name string, pipelineSupplier PipelineSupplier, handler ResultHandler) pipeline.Step {
step := pipeline.Step{Name: name}
step.F = func(ctx pipeline.Context) pipeline.Result {
step.F = func(ctx context.Context) pipeline.Result {
pipelineChan := make(chan *pipeline.Pipeline)
m := sync.Map{}
var wg sync.WaitGroup
i := uint64(0)

go pipelineSupplier(pipelineChan)
go pipelineSupplier(ctx, pipelineChan)
for pipe := range pipelineChan {
p := pipe
wg.Add(1)
n := i
i++
go func() {
defer wg.Done()
m.Store(n, p.Run())
m.Store(n, p.RunWithContext(ctx))
}()
}

wg.Wait()
return collectResults(ctx, handler, &m)
res := collectResults(ctx, handler, &m)
return setResultErrorFromContext(ctx, res)
}
return step
}
Loading