Skip to content

Commit

Permalink
feat: run checks in parallel
Browse files Browse the repository at this point in the history
closes #148

Signed-off-by: Clément Nussbaumer <clement.nussbaumer@postfinance.ch>
  • Loading branch information
clementnuss committed Jul 22, 2024
1 parent a2d392f commit 04caaa4
Showing 1 changed file with 35 additions and 14 deletions.
49 changes: 35 additions & 14 deletions internal/servicecheck/servicecheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net"
"net/http"
"os"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -70,41 +71,60 @@ func New(_ context.Context, cl client.Client, promRegistry *prometheus.Registry,
// is respected.
func (c *Checker) Run() {
// Run Checks
result := make(map[string]any)
result := sync.Map{}

wg := sync.WaitGroup{}

// Cache result (used for /alive handler)
defer func() { c.LastCheckResult = result }()
defer func() {
res := make(map[string]any)

result.Range(func(key, value any) bool {
k, _ := key.(string)
res[k] = value

return true
})

c.LastCheckResult = res
}()

wg.Add(4)

c.measure(result, c.APIServerDirect, APIServerDirect)
c.measure(result, c.APIServerDNS, APIServerDNS)
c.measure(result, c.MeIngress, meIngress)
c.measure(result, c.MeService, meService)
go c.measure(&wg, &result, c.APIServerDirect, APIServerDirect)
go c.measure(&wg, &result, c.APIServerDNS, APIServerDNS)
go c.measure(&wg, &result, c.MeIngress, meIngress)
go c.measure(&wg, &result, c.MeService, meService)

if c.SkipCheckNeighbourhood {
result[NeighbourhoodState] = skippedStr
result.Store(NeighbourhoodState, skippedStr)
return
}

neighbours, err := c.getNeighbours(context.Background(), c.KubenurseNamespace, c.NeighbourFilter)
if err != nil {
result[NeighbourhoodState] = err.Error()
result.Store(NeighbourhoodState, err.Error())
return
}

result[NeighbourhoodState] = okStr
result[Neighbourhood] = neighbours
result.Store(NeighbourhoodState, okStr)
result.Store(Neighbourhood, neighbours)

if c.NeighbourLimit > 0 && len(neighbours) > c.NeighbourLimit {
neighbours = c.filterNeighbours(neighbours)
}

wg.Add((len(neighbours)))

for _, neighbour := range neighbours {
check := func(ctx context.Context) string {
return c.doRequest(ctx, podIPtoURL(neighbour.PodIP, c.UseTLS), true)
}

c.measure(result, check, "path_"+neighbour.NodeName)
go c.measure(&wg, &result, check, "path_"+neighbour.NodeName)
}

wg.Wait()
}

// RunScheduled runs the checks in the specified interval which can be used to keep the metrics up-to-date. This
Expand Down Expand Up @@ -169,12 +189,13 @@ func (c *Checker) MeService(ctx context.Context) string {
}

// measure implements metric collections for the check
func (c *Checker) measure(res map[string]any, check Check, requestType string) {
func (c *Checker) measure(wg *sync.WaitGroup, res *sync.Map, check Check, requestType string) {
// Add our label (check type) to the context so our http tracer can annotate
// metrics and errors based with the label
ctx := context.WithValue(context.Background(), kubenurseTypeKey{}, requestType)
defer wg.Done()

res[requestType] = check(ctx)
ctx := context.WithValue(context.Background(), kubenurseTypeKey{}, requestType)
res.Store(requestType, check(ctx))
}

func podIPtoURL(podIP string, useTLS bool) string {
Expand Down

0 comments on commit 04caaa4

Please sign in to comment.