Skip to content

A taskflow-like General-purpose Task-parallel Programming Framework with integrated visualizer and profiler

License

Notifications You must be signed in to change notification settings

noneback/go-taskflow

Repository files navigation

Go-Taskflow

codecov Go Reference Go Report Card Mentioned in Awesome Go

A taskflow-like General-purpose Task-parallel Programming Framework for Go, inspired by taskflow-cpp, with Go's native capabilities and simplicity, suitable for complex dependency management in concurrent tasks.

Feature

  • High extensibility: Easily extend the framework to adapt to various specific use cases.

  • Native Go's concurrency model: Leverages Go's goroutines to manage concurrent task execution effectively.

  • User-friendly programming interface: Simplify complex task dependency management using Go.

  • Static\Subflow\Conditional\Cyclic tasking: Define static tasks, condition nodes, nested subflows and cyclic flow to enhance modularity and programmability.

    Static Subflow Condition Cyclic
  • Priority Task Schedule: Define tasks' priority, higher priority tasks will be scheduled first.

  • Built-in visualization & profiling tools: Generate visual representations of tasks and profile task execution performance using integrated tools, making debugging and optimization easier.

Use Cases

  • Data Pipeline: Orchestrate data processing stages that have complex dependencies.

  • Workflow Automation: Define and run automation workflows where tasks have a clear sequence and dependency structure.

  • Parallel Tasking: Execute independent tasks concurrently to fully utilize CPU resources.

Example

import latest version: go get -u github.com/noneback/go-taskflow

package main
import (
"fmt"
"log"
"os"
"runtime"
"time"
gotaskflow "github.com/noneback/go-taskflow"
)
func main() {
// 1. Create An executor
executor := gotaskflow.NewExecutor(uint(runtime.NumCPU() - 1))
// 2. Prepare all node you want and arrenge their dependencies in a refined DAG
tf := gotaskflow.NewTaskFlow("G")
A, B, C :=
gotaskflow.NewTask("A", func() {
fmt.Println("A")
}),
gotaskflow.NewTask("B", func() {
fmt.Println("B")
}),
gotaskflow.NewTask("C", func() {
fmt.Println("C")
})
A1, B1, C1 :=
gotaskflow.NewTask("A1", func() {
fmt.Println("A1")
}).Priority(gotaskflow.HIGH),
gotaskflow.NewTask("B1", func() {
fmt.Println("B1")
}),
gotaskflow.NewTask("C1", func() {
fmt.Println("C1")
})
A.Precede(B)
C.Precede(B)
A1.Precede(B)
C.Succeed(A1)
C.Succeed(B1)
subflow := gotaskflow.NewSubflow("sub1", func(sf *gotaskflow.Subflow) {
A2, B2, C2 :=
gotaskflow.NewTask("A2", func() {
fmt.Println("A2")
}),
gotaskflow.NewTask("B2", func() {
fmt.Println("B2")
}),
gotaskflow.NewTask("C2", func() {
fmt.Println("C2")
})
A2.Precede(B2)
C2.Precede(B2)
sf.Push(A2, B2, C2)
})
subflow2 := gotaskflow.NewSubflow("sub2", func(sf *gotaskflow.Subflow) {
A3, B3, C3 :=
gotaskflow.NewTask("A3", func() {
fmt.Println("A3")
}),
gotaskflow.NewTask("B3", func() {
fmt.Println("B3")
}),
gotaskflow.NewTask("C3", func() {
fmt.Println("C3")
})
A3.Precede(B3)
C3.Precede(B3)
sf.Push(A3, B3, C3)
})
cond := gotaskflow.NewCondition("binary", func() uint {
return uint(time.Now().Second() % 2)
})
B.Precede(cond)
cond.Precede(subflow, subflow2)
// 3. Push all node into Taskflow
tf.Push(A, B, C)
tf.Push(A1, B1, C1, cond, subflow, subflow2)
// 4. Run Taskflow via Executor
executor.Run(tf).Wait()
// Visualize dag if you need to check dag execution.
if err := gotaskflow.Visualize(tf, os.Stdout); err != nil {
log.Fatal(err)
}
// Profile it if you need to see which task is most time-consuming
if err := executor.Profile(os.Stdout); err != nil {
log.Fatal(err)
}
}

package main
import (
"fmt"
"log"
"math/rand"
"os"
"slices"
"strconv"
"sync"
gtf "github.com/noneback/go-taskflow"
)
// meger sorted src to sorted dest
func mergeInto(dest, src []int) []int {
size := len(dest) + len(src)
tmp := make([]int, 0, size)
i, j := 0, 0
for i < len(dest) && j < len(src) {
if dest[i] < src[j] {
tmp = append(tmp, dest[i])
i++
} else {
tmp = append(tmp, src[j])
j++
}
}
if i < len(dest) {
tmp = append(tmp, dest[i:]...)
} else {
tmp = append(tmp, src[j:]...)
}
return tmp
}
func main() {
size := 100
radomArr := make([][]int, 10)
sortedArr := make([]int, 0, 10*size)
mutex := &sync.Mutex{}
for i := 0; i < 10; i++ {
for j := 0; j < size; j++ {
radomArr[i] = append(radomArr[i], rand.Int())
}
}
sortTasks := make([]*gtf.Task, 10)
tf := gtf.NewTaskFlow("merge sort")
done := gtf.NewTask("Done", func() {
if !slices.IsSorted(sortedArr) {
log.Fatal("Failed")
}
fmt.Println("Sorted")
fmt.Println(sortedArr[:1000])
})
for i := 0; i < 10; i++ {
sortTasks[i] = gtf.NewTask("sort_"+strconv.Itoa(i), func() {
arr := radomArr[i]
slices.Sort(arr)
mutex.Lock()
defer mutex.Unlock()
sortedArr = mergeInto(sortedArr, arr)
})
}
done.Succeed(sortTasks...)
tf.Push(sortTasks...)
tf.Push(done)
executor := gtf.NewExecutor(1000)
executor.Run(tf).Wait()
if err := gtf.Visualize(tf, os.Stdout); err != nil {
log.Fatal("V->", err)
}
if err := executor.Profile(os.Stdout); err != nil {
log.Fatal("P->", err)
}
}

Understand Condition Task Correctly

Condition Node is special in taskflow-cpp. It not only enrolls in Condition Control but also in Looping.

Our repo keeps almost the same behavior. You should read ConditionTasking to avoid common pitfalls.

Error Handling in go-taskflow

errors in golang are values. It is the user's job to handle it correctly.

Only unrecovered panic needs to be addressed by the framework. Now, if it happens, the whole parent graph will be canceled, leaving the rest tasks undone. This behavior may evolve someday. If you have any good thoughts, feel free to let me know.

How to use visualize taskflow

if err := gotaskflow.Visualize(tf, os.Stdout); err != nil {
		log.Fatal(err)
}

Visualize generates raw strings in dot format, use dot to draw a DAG svg.

dot

How to use profile taskflow

if err :=exector.Profile(os.Stdout);err != nil {
		log.Fatal(err)
}

Profile generates raw strings in flamegraph format, use flamegraph to draw a flamegraph svg.

flg

What's more

Any Features Request or Discussions are all welcomed.

About

A taskflow-like General-purpose Task-parallel Programming Framework with integrated visualizer and profiler

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages