Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove EnablePassiveHealthCheck flag #3029

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.21
v0.22
18 changes: 7 additions & 11 deletions proxy/healthy_endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ const (

func defaultEndpointRegistry() *routing.EndpointRegistry {
return routing.NewEndpointRegistry(routing.RegistryOptions{
PassiveHealthCheckEnabled: true,
StatsResetPeriod: period,
MinRequests: 10,
MaxHealthCheckDropProbability: 1.0,
PassiveHealthCheck: &routing.PassiveHealthCheck{
Period: period,
MinRequests: 10,
MaxDropProbability: 1.0,
},
})
}

Expand All @@ -49,11 +50,8 @@ func sendGetRequests(t *testing.T, ps *httptest.Server) (failed int) {
}

func setupProxy(t *testing.T, doc string) (*testProxy, *httptest.Server) {
endpointRegistry := defaultEndpointRegistry()

tp, err := newTestProxyWithParams(doc, Params{
EnablePassiveHealthCheck: true,
EndpointRegistry: endpointRegistry,
EndpointRegistry: defaultEndpointRegistry(),
})
require.NoError(t, err)

Expand Down Expand Up @@ -106,12 +104,10 @@ func TestPHCForSingleHealthyEndpoint(t *testing.T) {
w.WriteHeader(http.StatusOK)
}))
defer service.Close()
endpointRegistry := defaultEndpointRegistry()

doc := fmt.Sprintf(`* -> "%s"`, service.URL)
tp, err := newTestProxyWithParams(doc, Params{
EnablePassiveHealthCheck: true,
EndpointRegistry: endpointRegistry,
EndpointRegistry: defaultEndpointRegistry(),
})
if err != nil {
t.Fatal(err)
Expand Down
45 changes: 13 additions & 32 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,67 +145,54 @@ type OpenTracingParams struct {
ExcludeTags []string
}

type PassiveHealthCheck struct {
// The period of time after which the endpointregistry begins to calculate endpoints statistics
// from scratch
Period time.Duration

// The minimum number of total requests that should be sent to an endpoint in a single period to
// potentially opt out the endpoint from the list of healthy endpoints
MinRequests int64

// The maximum probability of unhealthy endpoint to be dropped out from load balancing for every specific request
MaxDropProbability float64
}

func InitPassiveHealthChecker(o map[string]string) (bool, *PassiveHealthCheck, error) {
func InitPassiveHealthChecker(o map[string]string) (*routing.PassiveHealthCheck, error) {
if len(o) == 0 {
return false, &PassiveHealthCheck{}, nil
return nil, nil
}

result := &PassiveHealthCheck{}
result := &routing.PassiveHealthCheck{}
keysInitialized := make(map[string]struct{})

for key, value := range o {
switch key {
case "period":
period, err := time.ParseDuration(value)
if err != nil {
return false, nil, fmt.Errorf("passive health check: invalid period value: %s", value)
return nil, fmt.Errorf("passive health check: invalid period value: %s", value)
}
if period < 0 {
return false, nil, fmt.Errorf("passive health check: invalid period value: %s", value)
return nil, fmt.Errorf("passive health check: invalid period value: %s", value)
}
result.Period = period
case "min-requests":
minRequests, err := strconv.Atoi(value)
if err != nil {
return false, nil, fmt.Errorf("passive health check: invalid minRequests value: %s", value)
return nil, fmt.Errorf("passive health check: invalid minRequests value: %s", value)
}
if minRequests < 0 {
return false, nil, fmt.Errorf("passive health check: invalid minRequests value: %s", value)
return nil, fmt.Errorf("passive health check: invalid minRequests value: %s", value)
}
result.MinRequests = int64(minRequests)
case "max-drop-probability":
maxDropProbability, err := strconv.ParseFloat(value, 64)
if err != nil {
return false, nil, fmt.Errorf("passive health check: invalid maxDropProbability value: %s", value)
return nil, fmt.Errorf("passive health check: invalid maxDropProbability value: %s", value)
}
if maxDropProbability < 0 || maxDropProbability > 1 {
return false, nil, fmt.Errorf("passive health check: invalid maxDropProbability value: %s", value)
return nil, fmt.Errorf("passive health check: invalid maxDropProbability value: %s", value)
}
result.MaxDropProbability = maxDropProbability
default:
return false, nil, fmt.Errorf("passive health check: invalid parameter: key=%s,value=%s", key, value)
return nil, fmt.Errorf("passive health check: invalid parameter: key=%s,value=%s", key, value)
}

keysInitialized[key] = struct{}{}
}

if len(keysInitialized) != 3 {
return false, nil, fmt.Errorf("passive health check: missing required parameters")
return nil, fmt.Errorf("passive health check: missing required parameters")
}
return true, result, nil
return result, nil
}

// Proxy initialization options.
Expand Down Expand Up @@ -310,12 +297,6 @@ type Params struct {
// and returns some metadata about endpoint. Information about the metadata
// returned from the registry could be found in routing.Metrics interface.
EndpointRegistry *routing.EndpointRegistry

// EnablePassiveHealthCheck enables the healthy endpoints checker
EnablePassiveHealthCheck bool

// PassiveHealthCheck defines the parameters for the healthy endpoints checker.
PassiveHealthCheck *PassiveHealthCheck
}

type (
Expand Down Expand Up @@ -790,7 +771,7 @@ func WithParams(p Params) *Proxy {
hostname := os.Getenv("HOSTNAME")

var healthyEndpointsChooser *healthyEndpoints
if p.EnablePassiveHealthCheck {
if p.EndpointRegistry.GetPassiveHealthCheck() != nil {
healthyEndpointsChooser = &healthyEndpoints{
rnd: rand.New(loadbalancer.NewLockedSource()),
endpointRegistry: p.EndpointRegistry,
Expand Down
69 changes: 27 additions & 42 deletions proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2283,35 +2283,31 @@ func BenchmarkAccessLogEnable(b *testing.B) { benchmarkAccessLog(b, "enableAcces

func TestInitPassiveHealthChecker(t *testing.T) {
for i, ti := range []struct {
inputArg map[string]string
expectedEnabled bool
expectedParams *PassiveHealthCheck
expectedError error
inputArg map[string]string
expectedParams *routing.PassiveHealthCheck
expectedError error
}{
{
inputArg: map[string]string{},
expectedEnabled: false,
expectedParams: nil,
expectedError: nil,
inputArg: map[string]string{},
expectedParams: nil,
expectedError: nil,
},
{
inputArg: map[string]string{
"period": "somethingInvalid",
"min-requests": "10",
"max-drop-probability": "0.9",
},
expectedEnabled: false,
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid period value: somethingInvalid"),
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid period value: somethingInvalid"),
},
{
inputArg: map[string]string{
"period": "1m",
"min-requests": "10",
"max-drop-probability": "0.9",
},
expectedEnabled: true,
expectedParams: &PassiveHealthCheck{
expectedParams: &routing.PassiveHealthCheck{
Period: 1 * time.Minute,
MinRequests: 10,
MaxDropProbability: 0.9,
Expand All @@ -2324,59 +2320,53 @@ func TestInitPassiveHealthChecker(t *testing.T) {
"min-requests": "10",
"max-drop-probability": "0.9",
},
expectedEnabled: false,
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid period value: -1m"),
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid period value: -1m"),
},
{
inputArg: map[string]string{
"period": "1m",
"min-requests": "somethingInvalid",
"max-drop-probability": "0.9",
},
expectedEnabled: false,
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid minRequests value: somethingInvalid"),
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid minRequests value: somethingInvalid"),
},
{
inputArg: map[string]string{
"period": "1m",
"min-requests": "-10",
"max-drop-probability": "0.9",
},
expectedEnabled: false,
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid minRequests value: -10"),
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid minRequests value: -10"),
},
{
inputArg: map[string]string{
"period": "1m",
"min-requests": "10",
"max-drop-probability": "somethingInvalid",
},
expectedEnabled: false,
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: somethingInvalid"),
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: somethingInvalid"),
},
{
inputArg: map[string]string{
"period": "1m",
"min-requests": "10",
"max-drop-probability": "-0.1",
},
expectedEnabled: false,
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: -0.1"),
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: -0.1"),
},
{
inputArg: map[string]string{
"period": "1m",
"min-requests": "10",
"max-drop-probability": "3.1415",
},
expectedEnabled: false,
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: 3.1415"),
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: 3.1415"),
},
{
inputArg: map[string]string{
Expand All @@ -2385,28 +2375,23 @@ func TestInitPassiveHealthChecker(t *testing.T) {
"max-drop-probability": "0.9",
"non-existing": "non-existing",
},
expectedEnabled: false,
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid parameter: key=non-existing,value=non-existing"),
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid parameter: key=non-existing,value=non-existing"),
},
{
inputArg: map[string]string{
"period": "1m",
"min-requests": "10",
/* forgot max-drop-probability */
},
expectedEnabled: false,
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: missing required parameters"),
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: missing required parameters"),
},
} {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
enabled, params, err := InitPassiveHealthChecker(ti.inputArg)
assert.Equal(t, ti.expectedEnabled, enabled)
params, err := InitPassiveHealthChecker(ti.inputArg)
assert.Equal(t, ti.expectedError, err)
if enabled {
assert.Equal(t, ti.expectedParams, params)
}
assert.Equal(t, ti.expectedParams, params)
})
}
}
31 changes: 14 additions & 17 deletions routing/endpointregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,8 @@ func newEntry() *entry {
}

type EndpointRegistry struct {
lastSeenTimeout time.Duration
statsResetPeriod time.Duration
minRequests int64
maxHealthCheckDropProbability float64
lastSeenTimeout time.Duration
passiveHealthCheck *PassiveHealthCheck

quit chan struct{}

Expand All @@ -107,11 +105,8 @@ type EndpointRegistry struct {
var _ PostProcessor = &EndpointRegistry{}

type RegistryOptions struct {
LastSeenTimeout time.Duration
PassiveHealthCheckEnabled bool
StatsResetPeriod time.Duration
MinRequests int64
MaxHealthCheckDropProbability float64
LastSeenTimeout time.Duration
PassiveHealthCheck *PassiveHealthCheck
}

func (r *EndpointRegistry) Do(routes []*Route) []*Route {
Expand Down Expand Up @@ -151,7 +146,7 @@ func (r *EndpointRegistry) Do(routes []*Route) []*Route {
}

func (r *EndpointRegistry) updateStats() {
ticker := time.NewTicker(r.statsResetPeriod)
ticker := time.NewTicker(r.passiveHealthCheck.Period)

for {
r.data.Range(func(key, value any) bool {
Expand All @@ -164,9 +159,9 @@ func (r *EndpointRegistry) updateStats() {

failed := e.totalFailedRoundTrips[curSlot].Load()
requests := e.totalRequests[curSlot].Load()
if requests > r.minRequests {
if requests > r.passiveHealthCheck.MinRequests {
failedRoundTripsRatio := float64(failed) / float64(requests)
e.healthCheckDropProbability.Store(min(failedRoundTripsRatio, r.maxHealthCheckDropProbability))
e.healthCheckDropProbability.Store(min(failedRoundTripsRatio, r.passiveHealthCheck.MaxDropProbability))
} else {
e.healthCheckDropProbability.Store(0.0)
}
Expand All @@ -193,17 +188,15 @@ func NewEndpointRegistry(o RegistryOptions) *EndpointRegistry {
}

registry := &EndpointRegistry{
lastSeenTimeout: o.LastSeenTimeout,
statsResetPeriod: o.StatsResetPeriod,
minRequests: o.MinRequests,
maxHealthCheckDropProbability: o.MaxHealthCheckDropProbability,
lastSeenTimeout: o.LastSeenTimeout,
passiveHealthCheck: o.PassiveHealthCheck,

quit: make(chan struct{}),

now: time.Now,
data: sync.Map{},
}
if o.PassiveHealthCheckEnabled {
if o.PassiveHealthCheck != nil {
go registry.updateStats()
}

Expand All @@ -219,6 +212,10 @@ func (r *EndpointRegistry) GetMetrics(hostPort string) Metrics {
return e.(*entry)
}

func (r *EndpointRegistry) GetPassiveHealthCheck() *PassiveHealthCheck {
return r.passiveHealthCheck
}

func (r *EndpointRegistry) allMetrics() map[string]Metrics {
result := make(map[string]Metrics)
r.data.Range(func(k, v any) bool {
Expand Down
13 changes: 13 additions & 0 deletions routing/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@ func (o MatchingOptions) ignoreTrailingSlash() bool {
return o&IgnoreTrailingSlash > 0
}

type PassiveHealthCheck struct {
// The period of time after which the endpointregistry begins to calculate endpoints statistics
// from scratch
Period time.Duration

// The minimum number of total requests that should be sent to an endpoint in a single period to
// potentially opt out the endpoint from the list of healthy endpoints
MinRequests int64

// The maximum probability of unhealthy endpoint to be dropped out from load balancing for every specific request
MaxDropProbability float64
}

// DataClient instances provide data sources for
// route definitions.
type DataClient interface {
Expand Down
Loading
Loading