Skip to content

Commit c06da67

Browse files
authored
Merge pull request #36 from ccremer/package
Restructure packages
2 parents 2b1d53b + d6b1fc7 commit c06da67

21 files changed

+288
-294
lines changed

README.md

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ Small Go utility that executes business actions in a pipeline.
1313
import (
1414
"context"
1515
pipeline "github.com/ccremer/go-command-pipeline"
16-
"github.com/ccremer/go-command-pipeline/predicate"
1716
)
1817

1918
type Data struct {
@@ -24,7 +23,7 @@ func main() {
2423
data := &Data // define arbitrary data to pass around in the steps.
2524
p := pipeline.NewPipeline()
2625
p.WithSteps(
27-
pipeline.NewStep("define random number", defineNumber),
26+
pipeline.NewStepFromFunc("define random number", defineNumber),
2827
pipeline.NewStepFromFunc("print number", printNumber),
2928
)
3029
result := p.RunWithContext(context.WithValue(context.Background, "data", data))
@@ -33,9 +32,9 @@ func main() {
3332
}
3433
}
3534

36-
func defineNumber(ctx context.Context) pipeline.Result {
35+
func defineNumber(ctx context.Context) error {
3736
ctx.Value("data").(*Data).Number = 10
38-
return pipeline.Result{}
37+
return nil
3938
}
4039

4140
// Let's assume this is a business function that can fail.
@@ -76,18 +75,18 @@ It could be simplified to something like this:
7675
```go
7776
func Persist(data *Data) error {
7877
p := pipeline.NewPipeline().WithSteps(
79-
pipeline.NewStep("prepareTransaction", prepareTransaction()),
80-
pipeline.NewStep("executeQuery", executeQuery()),
81-
pipeline.NewStep("commitTransaction", commit()),
78+
pipeline.NewStepFromFunc("prepareTransaction", prepareTransaction()),
79+
pipeline.NewStepFromFunc("executeQuery", executeQuery()),
80+
pipeline.NewStepFromFunc("commitTransaction", commit()),
8281
)
8382
return p.RunWithContext(context.WithValue(context.Background(), myKey, data).Err
8483
}
8584

86-
func executeQuery() pipeline.ActionFunc {
87-
return func(ctx context.Context) pipeline.Result {
85+
func executeQuery() error {
86+
return func(ctx context.Context) error {
8887
data := ctx.Value(myKey).(*Data)
8988
err := database.executeQuery("SOME QUERY", data)
90-
return pipeline.Result{Err: err}
89+
return err
9190
)
9291
}
9392
...

examples/abort_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ func TestExample_Abort(t *testing.T) {
1919
pipeline.NewStepFromFunc("never executed", doNotExecute),
2020
)
2121
result := p.Run()
22-
assert.True(t, result.IsSuccessful())
22+
assert.True(t, result.IsCompleted())
2323
assert.True(t, result.IsAborted())
24+
assert.False(t, result.IsSuccessful())
2425
}
2526

2627
func doNotExecute(_ context.Context) error {

examples/context_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func TestExample_Context(t *testing.T) {
2222
// Create pipeline with defaults
2323
p := pipeline.NewPipeline()
2424
p.WithSteps(
25-
pipeline.NewStep("define random number", defineNumber),
25+
pipeline.NewStepFromFunc("define random number", defineNumber),
2626
pipeline.NewStepFromFunc("print number", printNumber),
2727
)
2828
result := p.RunWithContext(context.WithValue(context.Background(), key, &Data{}))
@@ -31,9 +31,9 @@ func TestExample_Context(t *testing.T) {
3131
}
3232
}
3333

34-
func defineNumber(ctx context.Context) pipeline.Result {
34+
func defineNumber(ctx context.Context) error {
3535
ctx.Value(key).(*Data).Number = rand.Int()
36-
return pipeline.Result{}
36+
return nil
3737
}
3838

3939
func printNumber(ctx context.Context) error {

examples/git_test.go

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,14 @@ import (
1111
"testing"
1212

1313
pipeline "github.com/ccremer/go-command-pipeline"
14-
"github.com/ccremer/go-command-pipeline/predicate"
1514
)
1615

1716
func TestExample_Git(t *testing.T) {
1817
p := pipeline.NewPipeline()
1918
p.WithSteps(
20-
predicate.ToStep("clone repository", CloneGitRepository(), predicate.Not(DirExists("my-repo"))),
21-
pipeline.NewStep("checkout branch", CheckoutBranch()),
22-
pipeline.NewStep("pull", Pull()).WithResultHandler(logSuccess),
19+
CloneGitRepository(),
20+
CheckoutBranch(),
21+
Pull().WithResultHandler(logSuccess),
2322
)
2423
result := p.Run()
2524
if !result.IsSuccessful() {
@@ -28,29 +27,29 @@ func TestExample_Git(t *testing.T) {
2827
}
2928

3029
func logSuccess(_ context.Context, result pipeline.Result) error {
31-
log.Println("handler called")
30+
log.Println("handler called", result.Name())
3231
return result.Err()
3332
}
3433

35-
func CloneGitRepository() pipeline.ActionFunc {
36-
return func(_ context.Context) pipeline.Result {
34+
func CloneGitRepository() pipeline.Step {
35+
return pipeline.ToStep("clone repository", func(_ context.Context) error {
3736
err := execGitCommand("clone", "git@github.com/ccremer/go-command-pipeline")
38-
return pipeline.NewResultWithError("clone repository", err)
39-
}
37+
return err
38+
}, pipeline.Not(DirExists("my-repo")))
4039
}
4140

42-
func Pull() pipeline.ActionFunc {
43-
return func(_ context.Context) pipeline.Result {
41+
func Pull() pipeline.Step {
42+
return pipeline.NewStepFromFunc("pull", func(_ context.Context) error {
4443
err := execGitCommand("pull")
45-
return pipeline.NewResultWithError("pull", err)
46-
}
44+
return err
45+
})
4746
}
4847

49-
func CheckoutBranch() pipeline.ActionFunc {
50-
return func(_ context.Context) pipeline.Result {
48+
func CheckoutBranch() pipeline.Step {
49+
return pipeline.NewStepFromFunc("checkout branch", func(_ context.Context) error {
5150
err := execGitCommand("checkout", "master")
52-
return pipeline.NewResultWithError("checkout branch", err)
53-
}
51+
return err
52+
})
5453
}
5554

5655
func execGitCommand(args ...string) error {
@@ -61,7 +60,7 @@ func execGitCommand(args ...string) error {
6160
return err
6261
}
6362

64-
func DirExists(path string) predicate.Predicate {
63+
func DirExists(path string) pipeline.Predicate {
6564
return func(_ context.Context) bool {
6665
if info, err := os.Stat(path); err != nil || !info.IsDir() {
6766
return false

fanout.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package pipeline
2+
3+
import (
4+
"context"
5+
"sync"
6+
)
7+
8+
/*
9+
NewFanOutStep creates a pipeline step that runs nested pipelines in their own Go routines.
10+
The function provided as Supplier is expected to close the given channel when no more pipelines should be executed, otherwise this step blocks forever.
11+
The step waits until all pipelines are finished.
12+
If the given ParallelResultHandler is non-nil it will be called after all pipelines were run, otherwise the step is considered successful.
13+
14+
If the context is canceled, no new pipelines will be retrieved from the channel and the Supplier is expected to stop supplying new instances.
15+
Also, once canceled, the step waits for the remaining children pipelines and collects their result via given ParallelResultHandler.
16+
However, the error returned from ParallelResultHandler is wrapped in context.Canceled.
17+
*/
18+
func NewFanOutStep(name string, pipelineSupplier Supplier, handler ParallelResultHandler) Step {
19+
step := Step{Name: name}
20+
step.F = func(ctx context.Context) Result {
21+
pipelineChan := make(chan *Pipeline)
22+
m := sync.Map{}
23+
var wg sync.WaitGroup
24+
i := uint64(0)
25+
26+
go pipelineSupplier(ctx, pipelineChan)
27+
for pipe := range pipelineChan {
28+
p := pipe
29+
wg.Add(1)
30+
n := i
31+
i++
32+
go func() {
33+
defer wg.Done()
34+
m.Store(n, p.RunWithContext(ctx))
35+
}()
36+
}
37+
wg.Wait()
38+
res := collectResults(ctx, handler, &m)
39+
return setResultErrorFromContext(ctx, name, res)
40+
}
41+
return step
42+
}

parallel/fanout_test.go renamed to fanout_test.go

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package parallel
1+
package pipeline
22

33
import (
44
"context"
@@ -7,7 +7,6 @@ import (
77
"testing"
88
"time"
99

10-
pipeline "github.com/ccremer/go-command-pipeline"
1110
"github.com/stretchr/testify/assert"
1211
"go.uber.org/goleak"
1312
)
@@ -16,7 +15,7 @@ func TestNewFanOutStep(t *testing.T) {
1615
var counts uint64
1716
tests := map[string]struct {
1817
jobs int
19-
givenResultHandler ResultHandler
18+
givenResultHandler ParallelResultHandler
2019
returnErr error
2120
expectedCounts int
2221
}{
@@ -31,9 +30,9 @@ func TestNewFanOutStep(t *testing.T) {
3130
"GivenPipelineWith_WhenRunningStep_ThenReturnSuccessButRunErrorHandler": {
3231
jobs: 1,
3332
returnErr: fmt.Errorf("should be called"),
34-
givenResultHandler: func(ctx context.Context, _ map[uint64]pipeline.Result) pipeline.Result {
33+
givenResultHandler: func(ctx context.Context, _ map[uint64]Result) error {
3534
atomic.AddUint64(&counts, 1)
36-
return pipeline.Result{}
35+
return nil
3736
},
3837
expectedCounts: 2,
3938
},
@@ -44,15 +43,15 @@ func TestNewFanOutStep(t *testing.T) {
4443
goleak.VerifyNone(t)
4544
handler := tt.givenResultHandler
4645
if handler == nil {
47-
handler = func(ctx context.Context, results map[uint64]pipeline.Result) pipeline.Result {
46+
handler = func(ctx context.Context, results map[uint64]Result) error {
4847
assert.NoError(t, results[0].Err())
49-
return pipeline.Result{}
48+
return nil
5049
}
5150
}
52-
step := NewFanOutStep("fanout", func(_ context.Context, funcs chan *pipeline.Pipeline) {
51+
step := NewFanOutStep("fanout", func(_ context.Context, funcs chan *Pipeline) {
5352
defer close(funcs)
5453
for i := 0; i < tt.jobs; i++ {
55-
funcs <- pipeline.NewPipeline().WithSteps(pipeline.NewStepFromFunc("step", func(_ context.Context) error {
54+
funcs <- NewPipeline().WithSteps(NewStepFromFunc("step", func(_ context.Context) error {
5655
atomic.AddUint64(&counts, 1)
5756
return tt.returnErr
5857
}))
@@ -68,36 +67,36 @@ func TestNewFanOutStep(t *testing.T) {
6867
func TestNewFanOutStep_Cancel(t *testing.T) {
6968
defer goleak.VerifyNone(t)
7069
var counts uint64
71-
step := NewFanOutStep("fanout", func(ctx context.Context, pipelines chan *pipeline.Pipeline) {
70+
step := NewFanOutStep("fanout", func(ctx context.Context, pipelines chan *Pipeline) {
7271
defer close(pipelines)
7372
for i := 0; i < 10000; i++ {
7473
select {
7574
case <-ctx.Done():
7675
return
7776
default:
78-
pipelines <- pipeline.NewPipeline().WithSteps(pipeline.NewStepFromFunc("increase", func(_ context.Context) error {
77+
pipelines <- NewPipeline().WithSteps(NewStepFromFunc("increase", func(_ context.Context) error {
7978
atomic.AddUint64(&counts, 1)
8079
return nil
8180
}))
8281
time.Sleep(10 * time.Millisecond)
8382
}
8483
}
8584
t.Fail() // should not reach this
86-
}, func(ctx context.Context, results map[uint64]pipeline.Result) pipeline.Result {
85+
}, func(ctx context.Context, results map[uint64]Result) error {
8786
assert.Len(t, results, 3)
88-
return pipeline.NewResultWithError("fanout", fmt.Errorf("some error"))
87+
return fmt.Errorf("some error")
8988
})
9089
ctx, cancel := context.WithTimeout(context.Background(), 25*time.Millisecond)
9190
defer cancel()
92-
result := pipeline.NewPipeline().WithSteps(step).RunWithContext(ctx)
91+
result := NewPipeline().WithSteps(step).RunWithContext(ctx)
9392
assert.Equal(t, 3, int(counts))
9493
assert.True(t, result.IsCanceled(), "canceled flag")
9594
assert.EqualError(t, result.Err(), `step "fanout" failed: context deadline exceeded, collection error: some error`)
9695
}
9796

9897
func ExampleNewFanOutStep() {
99-
p := pipeline.NewPipeline()
100-
fanout := NewFanOutStep("fanout", func(ctx context.Context, pipelines chan *pipeline.Pipeline) {
98+
p := NewPipeline()
99+
fanout := NewFanOutStep("fanout", func(ctx context.Context, pipelines chan *Pipeline) {
101100
defer close(pipelines)
102101
// create some pipelines
103102
for i := 0; i < 3; i++ {
@@ -106,20 +105,20 @@ func ExampleNewFanOutStep() {
106105
case <-ctx.Done():
107106
return // parent pipeline has been canceled, let's not create more pipelines.
108107
default:
109-
pipelines <- pipeline.NewPipeline().AddStep(pipeline.NewStepFromFunc(fmt.Sprintf("i = %d", n), func(_ context.Context) error {
108+
pipelines <- NewPipeline().AddStep(NewStepFromFunc(fmt.Sprintf("i = %d", n), func(_ context.Context) error {
110109
time.Sleep(time.Duration(n * 10000000)) // fake some load
111110
fmt.Println(fmt.Sprintf("I am worker %d", n))
112111
return nil
113112
}))
114113
}
115114
}
116-
}, func(ctx context.Context, results map[uint64]pipeline.Result) pipeline.Result {
115+
}, func(ctx context.Context, results map[uint64]Result) error {
117116
for worker, result := range results {
118117
if result.IsFailed() {
119118
fmt.Println(fmt.Sprintf("Worker %d failed: %v", worker, result.Err()))
120119
}
121120
}
122-
return pipeline.Result{}
121+
return nil
123122
})
124123
p.AddStep(fanout)
125124
p.Run()

parallel/fanout.go

Lines changed: 0 additions & 44 deletions
This file was deleted.

0 commit comments

Comments
 (0)