API doc: https://pkg.go.dev/github.com/netobserv/gopipes
Go-pipes is a library that allows to dynamically connect multiple pipeline stages that are communicated via channels. Each stage will run in a goroutine.
This library is a selective fork of a community project: http://github.com/mariomac/pipes
This library allows wrapping functions within Nodes of a graph. In order to pass data across the nodes, each wrapped function must receive, as arguments, an input channel, an output channel, or both.
There are three types of nodes:
- Start node: each of the starting point of a graph. This is, all the nodes that bring information from outside the graph: e.g. because they generate them or because they acquire them from an external source like a Web Service. A graph must have at least one start node. A start node must be connected to at least one middle or terminal node.
- Middle node: any intermediate node that receives data from another node, processes/filters it, and forwards the data to another node. A Middle node must be connected to at least one middle or terminal node.
- Terminal node: any node that receives data from another node and does not forward it to another node, but can process it and send the results to outside the graph (e.g. memory, storage, web...)
The following pipeline has two Start nodes that send the data to two destination Middle
nodes (odds
and evens
). From there, the data follows their own branches until they
are eventually joined in the printer
Terminal node.
Check the complete examples in the examples/ folder).
func main() {
// Defining start, middle and terminal nodes that wrap some functions
start1 := node.AsStart(StartCounter)
start2 := node.AsStart(StartRandoms)
odds := node.AsMiddle(OddFilter)
evens := node.AsMiddle(EvenFilter)
oddsMsg := node.AsMiddle(Messager("odd number"))
evensMsg := node.AsMiddle(Messager("even number"))
printer := node.AsTerminal(Printer)
// Connecting nodes like:
//
// start1----\ /---start2
// | X |
// evens<---/ \-->odds
// | |
// evensMsg oddsMsg
// \ /
// printer
start1.SendsTo(evens, odds)
start2.SendsTo(evens, odds)
odds.SendsTo(oddsMsg)
evens.SendsTo(evensMsg)
oddsMsg.SendsTo(printer)
evensMsg.SendsTo(printer)
// all the Start nodes must be started to
// start forwarding data to the rest of the graph
start1.Start()
start2.Start()
// We can wait for terminal nodes to finish their execution
// after the rest of the graph has finished
<-printer.Done()
}
Output:
even number: 2
odd number: 847
odd number: 59
odd number: 81
odd number: 81
even number: 0
odd number: 3
odd number: 1
odd number: 887
even number: 4