Skip to content

Commit

Permalink
Merge pull request #2 from jantytgat/0.1.0-dev.20240926
Browse files Browse the repository at this point in the history
0.1.0 dev.20240926
  • Loading branch information
jantytgat authored Oct 22, 2024
2 parents c2bcdbe + 7844df1 commit a6aaf4d
Show file tree
Hide file tree
Showing 67 changed files with 2,455 additions and 113 deletions.
57 changes: 57 additions & 0 deletions examples/cron/simpleTicker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package main

import (
"context"
"fmt"
"time"

"github.com/jantytgat/go-jobs/pkg/cron"
)

func main() {
schedule := cron.EverySecond()
chOut := make(chan time.Time)
ticker := cron.NewTicker(schedule, chOut)

ctx, cancel := context.WithCancel(context.Background())
stop := context.AfterFunc(ctx, func() {
fmt.Println("Closing channel")
time.Sleep(1 * time.Second)
close(chOut)
})
defer stop()
defer cancel()

go func() {
for {
select {
case <-ctx.Done():
return
case t := <-chOut:
fmt.Println(t)
}
}
}()

// start ticker
fmt.Println("Starting ticker")
_ = ticker.Start(ctx)

// stop ticker after 5 seconds
fmt.Println("Stopping ticker after 2 seconds")
time.Sleep(2 * time.Second)
_ = ticker.Stop()

// try stopping ticker second time
fmt.Println("Stopping ticker a second time after 1 second")
time.Sleep(1 * time.Second)
_ = ticker.Stop()

// Restart ticker after 2 seconds
fmt.Println("Starting ticker again in 2 seconds")
time.Sleep(2 * time.Second)
_ = ticker.Start(ctx)

// Cancel the root context after 5 seconds
time.Sleep(5 * time.Second)
}
46 changes: 46 additions & 0 deletions examples/cron/stepTicker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package main

import (
"context"
"fmt"
"time"

"github.com/jantytgat/go-jobs/pkg/cron"
)

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

schedule, err := cron.NewSchedule("*/2 * * * * *")
if err != nil {
panic(err)
}
chOut := make(chan time.Time)

maxTickers := 1000
tickers := make([]*cron.Ticker, maxTickers)

for i := 0; i < maxTickers; i++ {
tickers[i] = cron.NewTicker(schedule, chOut)
}

go func() {
for {
select {
case <-ctx.Done():
return
case t := <-chOut:
fmt.Println(t)
}
}
}()

// start ticker
for i := 0; i < maxTickers; i++ {
_ = tickers[i].Start(ctx)
}

time.Sleep(20*time.Second + 1)
cancel()
}
72 changes: 72 additions & 0 deletions examples/orchestrator/memory/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package main

import (
"context"
"fmt"
"log/slog"
"os"
"runtime"
"sync"
"time"

"github.com/google/uuid"

"github.com/jantytgat/go-jobs/pkg/cron"
"github.com/jantytgat/go-jobs/pkg/job"
"github.com/jantytgat/go-jobs/pkg/orchestrator"
"github.com/jantytgat/go-jobs/pkg/task"
"github.com/jantytgat/go-jobs/pkg/taskLibrary"
)

func main() {
var err error

c := job.NewMemoryCatalog()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
o := orchestrator.New(runtime.NumCPU(), orchestrator.WithCatalog(c), orchestrator.WithLogger(slog.New(slog.NewTextHandler(os.Stdout, nil))))
wg := &sync.WaitGroup{}
wg.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 100; i++ {
var schedule cron.Schedule
if i%2 == 0 {
schedule, _ = cron.NewSchedule("*/2 * * * * *")
} else if i%3 == 0 {
schedule, _ = cron.NewSchedule("*/3 * * * * *")
} else if i%5 == 0 {
schedule, _ = cron.NewSchedule("*/5 * * * * *")
} else {
schedule, _ = cron.NewSchedule("* * * * * *")
}

t := make([]task.Task, 0)
t = append(t, taskLibrary.PrintTask{Message: fmt.Sprintf("Hello %d", i)})
t = append(t, taskLibrary.EmptyTask{})
t = append(t, taskLibrary.PrintTask{Message: fmt.Sprintf("this %d", i)})
t = append(t, taskLibrary.EmptyTask{})
t = append(t, taskLibrary.PrintTask{Message: fmt.Sprintf("is %d", i)})
t = append(t, taskLibrary.EmptyTask{})
t = append(t, taskLibrary.PrintTask{Message: fmt.Sprintf("my %d", i)})
t = append(t, taskLibrary.EmptyTask{})
t = append(t, taskLibrary.PrintTask{Message: fmt.Sprintf("message %d", i)})
t = append(t, taskLibrary.EmptyTask{})
t = append(t, taskLibrary.PrintTask{Message: fmt.Sprintf("Goodbye %d", i)})
t = append(t, taskLibrary.LogTask{Message: fmt.Sprintf("Done %d", i), Level: slog.LevelInfo})
j := job.New(uuid.New(), "sequenceJob", schedule, t, job.WithRunLimit(1))
if err = c.Add(j); err != nil {
panic(err)
}
}
}(wg)
wg.Wait()
fmt.Println(c.Statistics(), o.Statistics())
time.Sleep(2 * time.Second)
fmt.Println("STARTING")
o.Start(ctx)
time.Sleep(20 * time.Second)
o.Stop()
fmt.Println("FINAL STATS")
fmt.Println(c.Statistics(), o.Statistics())
}
60 changes: 60 additions & 0 deletions examples/task/poolPrintTask/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package main

import (
"context"
"fmt"
"time"

"github.com/jantytgat/go-jobs/pkg/task"
"github.com/jantytgat/go-jobs/pkg/taskLibrary"
)

func main() {
ctx, cancel := context.WithCancel(context.Background())
// defer cancel()

// hp := library.PrintTaskHandlerPool(ctx, time.Duration(5)*time.Second)
hp := task.NewHandlerPool(ctx, taskLibrary.PrintTaskHandler(time.Duration(1)*time.Second), 0)
chResult := make(chan task.HandlerResult, 2000)

go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case result := <-chResult:
if result.Error != nil {
fmt.Println(result)
}
}
}
}(ctx)

timer := time.After(time.Duration(1) * time.Second)
var exit bool
var i int
for {
if exit {
break
}

select {
case <-timer:
exit = true
default:
i++
hp.ChTasks <- task.HandlerTask{
Task: taskLibrary.PrintTask{Message: fmt.Sprintf("Task %04d", i)},
Pipeline: nil,
ChResult: chResult,
}
}
}
fmt.Println("Shutting down")
cancel()
time.Sleep(time.Duration(500) * time.Millisecond)
fmt.Println("Cancel is called")
stats := hp.Statistics()

fmt.Printf("A: %d / I: %d / R: %d / M: %d / T: %d\r\n", stats.ActiveWorkers, stats.IdleWorkers, stats.Workers, stats.MaxWorkers, stats.TasksProcessed)
}
14 changes: 14 additions & 0 deletions examples/task/printTask/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package main

import (
"context"
"fmt"

"github.com/jantytgat/go-jobs/pkg/taskLibrary"
)

func main() {
t := taskLibrary.PrintTask{Message: "Hello!"}
status, err := t.DefaultHandler().Execute(context.Background(), t, nil)
fmt.Printf("Status: %s, Error: %v\r\n", status, err)
}
81 changes: 81 additions & 0 deletions examples/task/repoPrintTask/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package main

import (
"context"
"fmt"
"time"

"github.com/jantytgat/go-jobs/pkg/task"
"github.com/jantytgat/go-jobs/pkg/taskLibrary"
)

func main() {
r := task.NewHandlerRepository()
ctx, cancel := context.WithCancel(context.Background())
chResult := make(chan task.HandlerResult, 2000)

go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case result := <-chResult:
if result.Error != nil {
fmt.Println(result)
}
}
}
}(ctx)

go func(ctx context.Context) {
timer := time.After(time.Duration(5) * time.Second)
var exit bool
var i int
for {
if exit {
break
}

select {
case <-timer:
exit = true
default:
i++
if err := r.Execute(ctx, task.HandlerTask{
Task: taskLibrary.PrintTask{Message: fmt.Sprintf("Task %04d", i)},
Pipeline: nil,
ChResult: chResult,
}); err != nil {
fmt.Println("EMPTY: ", err)
}
}
}
}(ctx)

timer := time.After(time.Duration(5) * time.Second)
var exit bool
var i int
for {
if exit {
break
}

select {
case <-timer:
exit = true
default:
i++
if err := r.Execute(ctx, task.HandlerTask{
Task: taskLibrary.PrintTask{Message: fmt.Sprintf("Task %04d", i)},
Pipeline: nil,
ChResult: chResult,
}); err != nil {
fmt.Println("PRINT: ", err)
}
}
}
cancel()
time.Sleep(time.Duration(500) * time.Millisecond)
fmt.Println("Cancel is called")
fmt.Println(r.Statistics())
}
43 changes: 43 additions & 0 deletions examples/task/sequence/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package main

import (
"context"
"fmt"
"log/slog"
"os"
"sync"

"github.com/jantytgat/go-jobs/pkg/task"
"github.com/jantytgat/go-jobs/pkg/taskLibrary"
)

func main() {
logHandler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})
logger := slog.New(logHandler)

r := task.NewHandlerRepository()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

tasks := make([]task.Task, 0)
for i := 0; i < 20000; i++ {
tasks = append(tasks, []task.Task{
taskLibrary.EmptyTask{},
taskLibrary.LogTask{Message: fmt.Sprintf("Task %d", i), Level: slog.LevelDebug},
taskLibrary.EmptyTask{},
}...)
}

wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if _, err := task.ExecuteSequence(ctx, logger, tasks, r); err != nil {
panic(err)
}
}()
}
wg.Wait()
fmt.Println(r.Statistics())
}
Loading

0 comments on commit a6aaf4d

Please sign in to comment.