Skip to content

Commit

Permalink
The library has been rewritten to work with generics and with simplif…
Browse files Browse the repository at this point in the history
…ied semantics.
  • Loading branch information
YN committed Dec 13, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 4259fb6 commit 7c39621
Showing 8 changed files with 305 additions and 314 deletions.
21 changes: 10 additions & 11 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -8,8 +8,6 @@ linters-settings:
funlen:
lines: 100
statements: 50
gci:
local-prefixes: github.com/golangci/golangci-lint
goconst:
min-len: 2
min-occurrences: 2
@@ -38,12 +36,10 @@ linters-settings:

linters:
enable:
- deadcode
- depguard
- dogsled
- dupl
- errcheck
- exportloopref
- exhaustive
- funlen
- gochecknoinits
@@ -52,7 +48,6 @@ linters:
- gocyclo
- gofmt
- goimports
- gomnd
- goprintffuncname
- gosec
- gosimple
@@ -64,19 +59,16 @@ linters:
- nolintlint
- rowserrcheck
- staticcheck
- structcheck
- stylecheck
- typecheck
- unconvert
- unparam
- unused
- varcheck
- whitespace
- asciicheck
- gochecknoglobals
- gocognit
- godox
- goerr113
- nestif
- prealloc
- revive
@@ -91,11 +83,9 @@ linters:
- predeclared
- tparallel
disable:
- exhaustivestruct
- paralleltest
- bodyclose
- godot
- ifshort
- noctx
- sqlclosecheck
- testpackage
@@ -104,4 +94,13 @@ issues:
exclude-rules:
- path: _test\.go
linters:
- gosec
- gosec

- path: _test\.go$
linters:
- revive
text: "empty-block"
- path: _test\.go$
linters:
- depguard
text: "not allowed from list 'Main'"
126 changes: 45 additions & 81 deletions README.md
Original file line number Diff line number Diff line change
@@ -24,127 +24,91 @@ Installation
------------

```
go get github.com/nazar256/parapipe
go get -u github.com/nazar256/parapipe@latest
```

Usage
-----

1. Create a pipeline
1. Create a pipeline with first step. Processing callback is generic (so as the pipeline).
It may receive and return any type of data, but the second return value should always be a boolean.

```go
cfg := parapipe.Config{
ProcessErrors: false, // messages implementing "error" interface will not be passed to subsequent workers
}
pipeline := parapipe.NewPipeline(cfg)
concurrency := runtime.NumCPU() // how many messages to process concurrently for each pipe
pipeline := parapipe.NewPipeline(concurrency, func(msg YourInputType) (YourOutputType, bool) {
// do something and generate a new value "someValue"
shouldProceedWithNextStep := true
return someValue, shouldProceedWithNextStep
})
```

2. Add pipes - call `Pipe()` method one or more times
2. Add pipes - call `Attach()` function one or more times to add steps to the pipeline
```go
concurrency := 5 // how many messages to process concurrently for each pipe
pipeline.Pipe(concurrency, func(msg interface{}) interface{} {
typedMsg := msg.(YourInputType) // assert your type for the message
// do something and generate a new value "someValue"
return someValue
p1 := parapipe.NewPipeline(runtime.NumCPU(), func(msg int) (int, bool) {
time.Sleep(30 * time.Millisecond)
return msg + 1000, true
})
```
p2 := parapipe.Attach(p1, parapipe.NewPipeline(concurrency, func(msg int) (string, bool) {
time.Sleep(30 * time.Millisecond)
return strconv.Itoa(msg), true
}))

// final pipeline you are going to work with (push messages and read output)
pipeline := parapipe.Attach(p2, parapipe.NewPipeline(concurrency, func(msg string) (string, bool) {
time.Sleep(30 * time.Millisecond)
return "#" + msg, true
}))
```

3. Get "out" channel when all pipes are added and read results from it
```go
for result := range pipeline.Out() {
typedResut := result.(YourResultType)
// do something with the result
}
```
It's **important** to read everything from "out" even when the pipeline won't produce any viable result.
It will be stuck otherwise.
It's **important** to drain the pipeline (read everything from "out") even when the pipeline won't produce any viable result.
It could be stuck otherwise.

4. Push values for processing into the pipeline:
```go
pipeline.Push("something")
```

5. Close pipeline to clean up its resources and close its output channel after the last message.
All internal channels, goroutines, including `Out()` channel will be closed in a cascade.
5. Close pipeline to after the last message. This will cleanup its resources and close its output channel.
It's not recommended closing pipeline using `defer` because you may not want to hang output util defer is executed.
```go
pipeline.Close()
```

### Error handling
### Circuit breaking

In some cases (errors) there could be impossible to process a message, thus there is no way to pass it further.
In such case just return `false` as a second return value from the step processing callback.
The first value will be ignored.

To handle errors just return them as a result then listen to them on Out.
By default, errors will not be processed by subsequent stages.
```go
pipeline.Pipe(4, func(msg interface{}) interface{} {
inputValue := msg.(YourInputType) // assert your type for the message
pipeline.Pipe(4, func(inputValue InputType) (OutputType, bool) {
someValue, err := someOperation(inputValue)
if err != nil {
return err // error can also be a result and can be returned from a pipeline stage (pipe)
// handle the error
// slog.Error("error when calling someOperation", "err", err)
return someValue, false
}
return someValue
return someValue, true
})
// ...
for result := range pipeline.Out() {
err := result.(error)
if err != nil {
// handle the error
// you may want to stop sending new values to the pipeline in your own way and do close(pipeline.In())
}
typedResut := result.(YourResultType)
// do something with the result
}
```

Optionally you may allow passing errors to subsequent pipes.
For example, if you do not wish to stop the pipeline on errors, but rather process them in subsequent pipes.
```go
cfg := parapipe.Config{
ProcessErrors: true, // messages implementing "error" interface will be passed to subsequent workers as any message
}
concurrency := 5 // how many messages to process concurrently for each pipe

pipeline := parapipe.NewPipeline(cfg).
Pipe(concurrency, func(msg interface{}) interface{} {
inputValue := msg.(YourInputType) // assert your type for the message
someValue, err := someOperation(inputValue)
if err != nil {
return err // error can also be a result and can be returned from a pipeline stage (pipe)
}
return someValue
}).
Pipe(concurrency, func(msg interface{}) interface{} {
switch inputValue := msg.(type) {
case error:
// process error
case YourNormalExpectedType:
// process message normally
}
})
```

### Limitations

* `Out()` method can be used only once on each pipeline. Any subsequent `Pipe()` call will cause panic.
Though, when you need to stream values somewhere from the middle of the pipeline - just send them to your own channel.
* do not try to `Push` to the pipeline before the first `Pipe` is defined - it will panic
* as at the time of writing Go does not have generics, you have to assert the type for incoming messages in pipes explicitly,
which means the type of the message can be checked in runtime only.

### Performance

As already was mentioned, parapipe makes use of `interface{}` and also executes callbacks in a separate goroutine per each message.
This can have a great performance impact because of heap allocation and creation of goroutines.
For instance if you try to stream a slice of integers, each of them will be converted to an interface type and
will likely be allocated in heap.
Moreover, if an execution time of each step is relatively small,
than a goroutine creation may decrease overall performance considerably.

If the performance is the priority, its recommended that you pack such messages in batches (i.e. slices)
and stream that batches instead.
Obviously that's your responsibility to process batch in the order you like inside step (pipe) callback.

Basically the overall recommendations for choosing batch size are in general the same as if you have to create a slice of interfaces
or create a new goroutine.
Parapipe makes use of generics and channels.
Overall it should be performant enough for most of the cases.
It has zero heap allocations in hot code, thus generates little load for garbage collector.
However, it uses channels under the hood and is bottlenecked mostly by the channel operations which are several
writes and reads per each message.

Examples
--------
@@ -159,7 +123,7 @@ See the [working example of using parapipe in AMQP client](http://github.com/naz

With parapipe you can:

* respond a JSON-feed as stream, retrieve, enrich and marshal each object concurrently, in maintained order and return them to the client
* in your API respond a long JSON-feed as stream, retrieve, enrich and marshal each object concurrently, in maintained order and return them to the client
* fetch and merge entries from different sources as one stream
* structure your HTTP-controllers
* structure your API controllers or handlers
* processing heavy files in effective way
9 changes: 5 additions & 4 deletions gen_data_test.go
Original file line number Diff line number Diff line change
@@ -2,20 +2,21 @@ package parapipe_test

import "github.com/nazar256/parapipe"

func makeRange(min, max int) []int {
a := make([]int, max-min+1)
func makeRange(start, end int) []int {
a := make([]int, end-start+1)
for i := range a {
a[i] = min + i
a[i] = start + i
}

return a
}

func feedPipeline(pipeline *parapipe.Pipeline, amount int) {
func feedPipeline[T any](pipeline *parapipe.Pipeline[int, T], amount int) {
go func() {
for i := 0; i < amount; i++ {
pipeline.Push(i)
}

pipeline.Close()
}()
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
module github.com/nazar256/parapipe

go 1.18

Loading

0 comments on commit 7c39621

Please sign in to comment.