Skip to content

Commit 0e7d36a

Browse files
authored
feat(worker): Added worker pool health check probe (ankorstore#267)
1 parent 27e0de4 commit 0e7d36a

File tree

5 files changed

+194
-14
lines changed

5 files changed

+194
-14
lines changed

worker/README.md

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* [Logging](#logging)
1717
* [Tracing](#tracing)
1818
* [Metrics](#metrics)
19+
* [Healthcheck](#healthcheck)
1920
<!-- TOC -->
2021

2122
## Installation
@@ -135,10 +136,10 @@ import (
135136
func main() {
136137
// create the pool
137138
pool, _ := worker.NewDefaultWorkerPoolFactory().Create(
138-
worker.WithGlobalDeferredStartThreshold(1), // will defer all workers start of 1 second
139-
worker.WithGlobalMaxExecutionsAttempts(2), // will run 2 times max failing workers
140-
worker.WithWorker(workers.NewClassicWorker(), worker.WithDeferredStartThreshold(3)), // registers the ClassicWorker, with a deferred start of 3 second
141-
worker.WithWorker(workers.NewCancellableWorker(), worker.WithMaxExecutionsAttempts(4)), // registers the CancellableWorker, with 4 runs max
139+
worker.WithGlobalDeferredStartThreshold(1), // will defer all workers start of 1 second
140+
worker.WithGlobalMaxExecutionsAttempts(2), // will run 2 times max failing workers
141+
worker.WithWorker(workers.NewClassicWorker(), worker.WithDeferredStartThreshold(3)), // registers the ClassicWorker, with a deferred start of 3 second
142+
worker.WithWorker(workers.NewCancellableWorker(), worker.WithMaxExecutionsAttempts(4)), // registers the CancellableWorker, with 4 runs max
142143
)
143144

144145
// start the pool
@@ -297,4 +298,39 @@ func main() {
297298
// start the pool
298299
pool.Start(context.Background())
299300
}
300-
```
301+
```
302+
303+
### Healthcheck
304+
305+
This module provides an [WorkerProbe](healthcheck/probe.go), compatible with
306+
the [healthcheck module](https://github.com/ankorstore/yokai/tree/main/healthcheck):
307+
308+
```go
309+
package main
310+
311+
import (
312+
"context"
313+
314+
yokaihc "github.com/ankorstore/yokai/healthcheck"
315+
"github.com/ankorstore/yokai/worker"
316+
"github.com/ankorstore/yokai/worker/healthcheck"
317+
)
318+
319+
func main() {
320+
// create the pool
321+
pool, _ := worker.NewDefaultWorkerPoolFactory().Create()
322+
323+
// create the checker with the worker probe
324+
checker, _ := yokaihc.NewDefaultCheckerFactory().Create(
325+
yokaihc.WithProbe(healthcheck.NewWorkerProbe(pool)),
326+
)
327+
328+
// start the pool
329+
pool.Start(context.Background())
330+
331+
// run the checker
332+
res, _ := checker.Check(context.Background(), yokaihc.Readiness)
333+
}
334+
```
335+
336+
This probe is successful if all the executions statuses of the [WorkerPool](pool.go) are healthy.

worker/go.mod

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@ module github.com/ankorstore/yokai/worker
33
go 1.20
44

55
require (
6-
github.com/ankorstore/yokai/generate v1.1.0
6+
github.com/ankorstore/yokai/generate v1.2.0
7+
github.com/ankorstore/yokai/healthcheck v1.1.0
78
github.com/ankorstore/yokai/log v1.2.0
8-
github.com/ankorstore/yokai/trace v1.2.0
9-
github.com/prometheus/client_golang v1.19.0
9+
github.com/ankorstore/yokai/trace v1.3.0
10+
github.com/prometheus/client_golang v1.19.1
1011
github.com/stretchr/testify v1.9.0
1112
go.opentelemetry.io/otel v1.24.0
1213
go.opentelemetry.io/otel/sdk v1.24.0

worker/go.sum

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1-
github.com/ankorstore/yokai/generate v1.1.0 h1:tu3S+uEYh+2qNo8Rf/WxWneDjh49YgDPzSnJfF8JkXA=
2-
github.com/ankorstore/yokai/generate v1.1.0/go.mod h1:gqS/i20wnvCOhcXydYdiGcASzBaeuW7GK6YYg/kkuY4=
1+
github.com/ankorstore/yokai/generate v1.2.0 h1:37siukjPGSS2kRnCnPhiuiF373+0tgwp0teXHnMsBhA=
2+
github.com/ankorstore/yokai/generate v1.2.0/go.mod h1:gqS/i20wnvCOhcXydYdiGcASzBaeuW7GK6YYg/kkuY4=
3+
github.com/ankorstore/yokai/healthcheck v1.1.0 h1:PXkEccym7iaVnQltpM5UFi0Xl0n+5rZDzlQju6HmGms=
4+
github.com/ankorstore/yokai/healthcheck v1.1.0/go.mod h1:IiYgjRa4G3OLZMwAuacuryZZAfDHsBH8PQoK4PgRdZ4=
35
github.com/ankorstore/yokai/log v1.2.0 h1:jiuDiC0dtqIGIOsFQslUHYoFJ1qjI+rOMa6dI1LBf2Y=
46
github.com/ankorstore/yokai/log v1.2.0/go.mod h1:MVvUcms1AYGo0BT6l88B9KJdvtK6/qGKdgyKVXfbmyc=
5-
github.com/ankorstore/yokai/trace v1.2.0 h1:Jnl++IGNpDYumsZJXP3qjhMdvyHbejiajQwIlU604w0=
6-
github.com/ankorstore/yokai/trace v1.2.0/go.mod h1:m7EL2MRBilgCtrly5gA4F0jkGSXR2EbG6LsotbTJ4nA=
7+
github.com/ankorstore/yokai/trace v1.3.0 h1:0ji32oymIcxTmH5h6GRWLo5ypwBbWrZkXRf9rWF9070=
8+
github.com/ankorstore/yokai/trace v1.3.0/go.mod h1:m7EL2MRBilgCtrly5gA4F0jkGSXR2EbG6LsotbTJ4nA=
79
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
810
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
911
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
@@ -37,8 +39,8 @@ github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D
3739
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
3840
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
3941
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
40-
github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU=
41-
github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k=
42+
github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE=
43+
github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho=
4244
github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos=
4345
github.com/prometheus/client_model v0.6.0/go.mod h1:NTQHnmxFpouOD0DpvP4XujX3CdOAGQPoaGhyTchlyt8=
4446
github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE=

worker/healthcheck/probe.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package healthcheck
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
8+
"github.com/ankorstore/yokai/healthcheck"
9+
"github.com/ankorstore/yokai/worker"
10+
)
11+
12+
// DefaultProbeName is the name of the worker probe.
13+
const DefaultProbeName = "worker"
14+
15+
// WorkerProbe is a probe compatible with the [healthcheck] module.
16+
//
17+
// [healthcheck]: https://github.com/ankorstore/yokai/tree/main/healthcheck
18+
type WorkerProbe struct {
19+
name string
20+
pool *worker.WorkerPool
21+
}
22+
23+
// NewWorkerProbe returns a new [WorkerProbe].
24+
func NewWorkerProbe(pool *worker.WorkerPool) *WorkerProbe {
25+
return &WorkerProbe{
26+
name: DefaultProbeName,
27+
pool: pool,
28+
}
29+
}
30+
31+
// Name returns the name of the [WorkerProbe].
32+
func (p *WorkerProbe) Name() string {
33+
return p.name
34+
}
35+
36+
// SetName sets the name of the [WorkerProbe].
37+
func (p *WorkerProbe) SetName(name string) *WorkerProbe {
38+
p.name = name
39+
40+
return p
41+
}
42+
43+
// Check returns a successful [healthcheck.CheckerProbeResult] if the worker pool executions are all in healthy status.
44+
func (p *WorkerProbe) Check(ctx context.Context) *healthcheck.CheckerProbeResult {
45+
success := true
46+
messages := []string{}
47+
48+
for name, execution := range p.pool.Executions() {
49+
if execution.Status() == worker.Unknown || execution.Status() == worker.Error {
50+
success = false
51+
}
52+
53+
messages = append(messages, fmt.Sprintf("%s: %s", name, execution.Status()))
54+
}
55+
56+
return healthcheck.NewCheckerProbeResult(success, strings.Join(messages, ", "))
57+
}

worker/healthcheck/probe_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package healthcheck_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/ankorstore/yokai/worker"
9+
"github.com/ankorstore/yokai/worker/healthcheck"
10+
"github.com/ankorstore/yokai/worker/testdata/workers"
11+
"github.com/stretchr/testify/assert"
12+
)
13+
14+
func TestWorkerProbe(t *testing.T) {
15+
t.Parallel()
16+
17+
t.Run("custom name", func(t *testing.T) {
18+
t.Parallel()
19+
20+
probe := healthcheck.NewWorkerProbe(&worker.WorkerPool{})
21+
22+
probe.SetName("foo")
23+
24+
assert.Equal(t, "foo", probe.Name())
25+
})
26+
27+
t.Run("check empty pool", func(t *testing.T) {
28+
t.Parallel()
29+
30+
pool, err := worker.NewDefaultWorkerPoolFactory().Create(
31+
worker.WithWorker(workers.NewClassicWorker()),
32+
)
33+
assert.NoError(t, err)
34+
35+
probe := healthcheck.NewWorkerProbe(pool)
36+
37+
res := probe.Check(context.Background())
38+
39+
assert.True(t, res.Success)
40+
assert.Empty(t, res.Message)
41+
})
42+
43+
t.Run("check success pool", func(t *testing.T) {
44+
t.Parallel()
45+
46+
pool, err := worker.NewDefaultWorkerPoolFactory().Create(
47+
worker.WithWorker(workers.NewClassicWorker()),
48+
)
49+
assert.NoError(t, err)
50+
51+
probe := healthcheck.NewWorkerProbe(pool)
52+
53+
err = pool.Start(context.Background())
54+
assert.NoError(t, err)
55+
56+
time.Sleep(15 * time.Millisecond)
57+
58+
res := probe.Check(context.Background())
59+
60+
assert.True(t, res.Success)
61+
assert.Equal(t, "ClassicWorker: success", res.Message)
62+
})
63+
64+
t.Run("check error pool", func(t *testing.T) {
65+
t.Parallel()
66+
67+
pool, err := worker.NewDefaultWorkerPoolFactory().Create(
68+
worker.WithWorker(workers.NewErrorWorker()),
69+
)
70+
assert.NoError(t, err)
71+
72+
probe := healthcheck.NewWorkerProbe(pool)
73+
74+
err = pool.Start(context.Background())
75+
assert.NoError(t, err)
76+
77+
time.Sleep(15 * time.Millisecond)
78+
79+
res := probe.Check(context.Background())
80+
81+
assert.False(t, res.Success)
82+
assert.Equal(t, "ErrorWorker: error", res.Message)
83+
})
84+
}

0 commit comments

Comments
 (0)