Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reverseproxy: active health check allows configurable health_passes and health_fails #6154

Merged
merged 4 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions caddytest/integration/reverseproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,8 @@ func TestReverseProxyHealthCheck(t *testing.T) {
health_port 2021
health_interval 10ms
health_timeout 100ms
health_passes 1
health_fails 1
}
}
`, "caddyfile")
Expand Down
34 changes: 34 additions & 0 deletions modules/caddyhttp/reverseproxy/caddyfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ func parseCaddyfile(h httpcaddyfile.Helper) (caddyhttp.MiddlewareHandler, error)
// health_uri <uri>
// health_port <port>
// health_interval <interval>
// health_passes <num>
// health_fails <num>
// health_timeout <duration>
// health_status <status>
// health_body <regexp>
Expand Down Expand Up @@ -447,6 +449,38 @@ func (h *Handler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
}
h.HealthChecks.Active.ExpectBody = d.Val()

case "health_passes":
if !d.NextArg() {
return d.ArgErr()
}
if h.HealthChecks == nil {
h.HealthChecks = new(HealthChecks)
}
if h.HealthChecks.Active == nil {
h.HealthChecks.Active = new(ActiveHealthChecks)
}
passes, err := strconv.Atoi(d.Val())
if err != nil {
return d.Errf("invalid passes count '%s': %v", d.Val(), err)
}
h.HealthChecks.Active.Passes = passes

case "health_fails":
if !d.NextArg() {
return d.ArgErr()
}
if h.HealthChecks == nil {
h.HealthChecks = new(HealthChecks)
}
if h.HealthChecks.Active == nil {
h.HealthChecks.Active = new(ActiveHealthChecks)
}
fails, err := strconv.Atoi(d.Val())
if err != nil {
return d.Errf("invalid fails count '%s': %v", d.Val(), err)
}
h.HealthChecks.Active.Fails = fails

case "max_fails":
if !d.NextArg() {
return d.ArgErr()
Expand Down
56 changes: 49 additions & 7 deletions modules/caddyhttp/reverseproxy/healthchecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ type ActiveHealthChecks struct {
// considering it unhealthy (default 5s).
Timeout caddy.Duration `json:"timeout,omitempty"`

// Number of consecutive health check passes before marking
// a previously unhealthy backend as healthy again (default 1).
Passes int `json:"passes,omitempty"`

// Number of consecutive health check failures before marking
// a previously healthy backend as unhealthy (default 1).
Fails int `json:"fails,omitempty"`

// The maximum response body to download from the backend
// during a health check.
MaxSize int64 `json:"max_size,omitempty"`
Expand Down Expand Up @@ -167,6 +175,14 @@ func (a *ActiveHealthChecks) Provision(ctx caddy.Context, h *Handler) error {
}
}

if a.Passes < 1 {
a.Passes = 1
}

if a.Fails < 1 {
a.Fails = 1
}

return nil
}

Expand Down Expand Up @@ -373,9 +389,37 @@ func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, upstre
}

markUnhealthy := func() {
// dispatch an event that the host newly became unhealthy
if upstream.setHealthy(false) {
h.events.Emit(h.ctx, "unhealthy", map[string]any{"host": hostAddr})
// increment failures and then check if it has reached the threshold to mark unhealthy
err := upstream.Host.countHealthFail(1)
if err != nil {
h.HealthChecks.Active.logger.Error("could not count active health failure",
zap.String("host", upstream.Dial),
zap.Error(err))
return
}
if upstream.Host.activeHealthFails() >= h.HealthChecks.Active.Fails {
// dispatch an event that the host newly became unhealthy
if upstream.setHealthy(false) {
h.events.Emit(h.ctx, "unhealthy", map[string]any{"host": hostAddr})
upstream.Host.resetHealth()
}
}
}

markHealthy := func() {
// increment passes and then check if it has reached the threshold to be healthy
err := upstream.Host.countHealthPass(1)
if err != nil {
h.HealthChecks.Active.logger.Error("could not count active health pass",
zap.String("host", upstream.Dial),
zap.Error(err))
return
}
if upstream.Host.activeHealthPasses() >= h.HealthChecks.Active.Passes {
if upstream.setHealthy(true) {
h.events.Emit(h.ctx, "healthy", map[string]any{"host": hostAddr})
upstream.Host.resetHealth()
}
}
}

Expand Down Expand Up @@ -439,10 +483,8 @@ func (h *Handler) doActiveHealthCheck(dialInfo DialInfo, hostAddr string, upstre
}

// passed health check parameters, so mark as healthy
if upstream.setHealthy(true) {
h.HealthChecks.Active.logger.Info("host is up", zap.String("host", hostAddr))
h.events.Emit(h.ctx, "healthy", map[string]any{"host": hostAddr})
}
h.HealthChecks.Active.logger.Info("host is up", zap.String("host", hostAddr))
markHealthy()

return nil
}
Expand Down
42 changes: 40 additions & 2 deletions modules/caddyhttp/reverseproxy/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,10 @@ func (u *Upstream) fillHost() {
// Host is the basic, in-memory representation of the state of a remote host.
// Its fields are accessed atomically and Host values must not be copied.
type Host struct {
numRequests int64 // must be 64-bit aligned on 32-bit systems (see https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
fails int64
numRequests int64 // must be 64-bit aligned on 32-bit systems (see https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
fails int64
activePasses int64
activeFails int64
}

// NumRequests returns the number of active requests to the upstream.
Expand All @@ -150,6 +152,16 @@ func (h *Host) Fails() int {
return int(atomic.LoadInt64(&h.fails))
}

// activeHealthPasses returns the number of consecutive active health check passes with the upstream.
func (h *Host) activeHealthPasses() int {
return int(atomic.LoadInt64(&h.activePasses))
}

// activeHealthFails returns the number of consecutive active health check failures with the upstream.
func (h *Host) activeHealthFails() int {
return int(atomic.LoadInt64(&h.activeFails))
}

// countRequest mutates the active request count by
// delta. It returns an error if the adjustment fails.
func (h *Host) countRequest(delta int) error {
Expand All @@ -170,6 +182,32 @@ func (h *Host) countFail(delta int) error {
return nil
}

// countHealthPass mutates the recent passes count by
// delta. It returns an error if the adjustment fails.
func (h *Host) countHealthPass(delta int) error {
result := atomic.AddInt64(&h.activePasses, int64(delta))
if result < 0 {
return fmt.Errorf("count below 0: %d", result)
}
return nil
}

// countHealthFail mutates the recent failures count by
// delta. It returns an error if the adjustment fails.
func (h *Host) countHealthFail(delta int) error {
result := atomic.AddInt64(&h.activeFails, int64(delta))
if result < 0 {
return fmt.Errorf("count below 0: %d", result)
}
return nil
}

// resetHealth resets the health check counters.
func (h *Host) resetHealth() {
atomic.StoreInt64(&h.activePasses, 0)
atomic.StoreInt64(&h.activeFails, 0)
}

// healthy returns true if the upstream is not actively marked as unhealthy.
// (This returns the status only from the "active" health checks.)
func (u *Upstream) healthy() bool {
Expand Down
Loading