Skip to content

Commit

Permalink
fix(activator): Correct probePodIPs to return correct noop value
Browse files Browse the repository at this point in the history
The probePodIPs sometimes (depending on configuration) will return a
true for noop when in fact there are changes.  This is due to changes to
the healthy endpoints being possible outside of probing.

- Change the unchanged value to just compare the existing healthy set
  with the new one.
- Add tests to cover most of the different cases of behavior for the
  probePodIPs function.

NOTE: There is one test case `no changes without probes` that now shows
different behavior than prior code.  Prior code would return a false for
noop.  After reviewing the calling code this did not seem to make sense
for a non-probing non-updating call to update the endpoints (given the
other non-probe changes are now accounted for).  So this was left as it
is now with the simple unchanged value logic.
  • Loading branch information
arsenetar committed Sep 5, 2023
1 parent 278bfcc commit b2de4aa
Show file tree
Hide file tree
Showing 2 changed files with 283 additions and 56 deletions.
8 changes: 3 additions & 5 deletions pkg/activator/net/revision_backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,16 +254,13 @@ func (rw *revisionWatcher) probePodIPs(ready, notReady sets.String) (succeeded s
var probeGroup errgroup.Group
healthyDests := make(chan string, dests.Len())

var probed bool
var sawNotMesh atomic.Bool
for dest := range dests {
if healthy.Has(dest) {
// If we already know it's healthy we don't need to probe again.
continue
}

probed = true

dest := dest // Standard Go concurrency pattern.
probeGroup.Go(func() error {
ok, notMesh, err := rw.probe(ctx, dest)
Expand All @@ -282,8 +279,6 @@ func (rw *revisionWatcher) probePodIPs(ready, notReady sets.String) (succeeded s
err = probeGroup.Wait()
close(healthyDests)

unchanged := probed && len(healthyDests) == 0

for d := range healthyDests {
healthy.Insert(d)
}
Expand All @@ -295,6 +290,9 @@ func (rw *revisionWatcher) probePodIPs(ready, notReady sets.String) (succeeded s
}
}

// Unchanged only if we match the incoming healthy set, as this handles all possible updates
unchanged := healthy.Equal(rw.healthyPods)

return healthy, unchanged, sawNotMesh.Load(), err
}

Expand Down
331 changes: 280 additions & 51 deletions pkg/activator/net/revision_backends_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1522,67 +1522,296 @@ func TestServiceMoreThanOne(t *testing.T) {

// More focused test around the probing of pods and the handling of different behaviors
func TestProbePodIPs(t *testing.T) {
fakeRT := activatortest.FakeRoundTripper{
ExpectHost: testRevision,
ProbeHostResponses: map[string][]activatortest.FakeResponse{
"10.10.1.1": {{
Code: http.StatusInternalServerError,
Err: errors.New("podIP transport error"),
}},
"10.10.1.2": {{
Code: http.StatusOK,
Body: queue.Name,
Delay: 250 * time.Millisecond,
}},
"10.10.1.3": {{
Code: http.StatusOK,
Body: queue.Name,
}},
},
type input struct {
current dests
healthy sets.String

Check failure on line 1527 in pkg/activator/net/revision_backends_test.go

View workflow job for this annotation

GitHub Actions / style / Golang / Lint

SA1019: sets.String is deprecated: use generic Set instead. new ways: s1 := Set[string]{} s2 := New[string]() (staticcheck)
meshMode netcfg.MeshCompatibilityMode
enableProbeOptimization bool
hostResponses map[string][]activatortest.FakeResponse
}

// Minimally constructed revisionWatcher just to have what is needed for probing
rw := &revisionWatcher{
clusterIPHealthy: true,
podsAddressable: true,
rev: types.NamespacedName{Namespace: testNamespace, Name: testRevision},
logger: TestLogger(t),
transport: pkgnetwork.RoundTripperFunc(fakeRT.RT),
meshMode: netcfg.MeshCompatibilityModeAuto,
enableProbeOptimisation: true,
type expected struct {
healthy sets.String

Check failure on line 1534 in pkg/activator/net/revision_backends_test.go

View workflow job for this annotation

GitHub Actions / style / Golang / Lint

SA1019: sets.String is deprecated: use generic Set instead. new ways: s1 := Set[string]{} s2 := New[string]() (staticcheck)
noop bool
notMesh bool
success bool
numProbes int32
}

// Initial state for tests
cur := dests{
ready: sets.NewString("10.10.1.3", "10.10.1.2"),
notReady: sets.NewString("10.10.1.1"),
type test struct {
name string
input input
expected expected
}

// Test all pods healthy (skips probes)
rw.healthyPods = cur.ready.Union(cur.notReady)
healthy, noop, notMesh, err := rw.probePodIPs(cur.ready, cur.notReady)
if !healthy.Equal(rw.healthyPods) {
t.Errorf("Healthy does not match, got %#v, want %#v diff: %s", healthy, rw.healthyPods, cmp.Diff(healthy, cur.ready))
}
if !noop || notMesh || err != nil {
t.Errorf("Unexpected values: noop=%t (true), notMesh=%t (false), err=%s (nil)", noop, notMesh, err)
tests := []test{
{
name: "all healthy", // Test skipping probes when all endpoints are healthy
input: input{
current: dests{
ready: sets.NewString("10.10.1.1"),
notReady: sets.NewString("10.10.1.2"),
},
healthy: sets.NewString("10.10.1.1", "10.10.1.2"),
},
expected: expected{
healthy: sets.NewString("10.10.1.1", "10.10.1.2"),
noop: true,
notMesh: false,
success: true,
numProbes: 0,
},
},
{
name: "one pod fails probe", // Test that we probe all pods when one fails
input: input{
current: dests{
notReady: sets.NewString("10.10.1.1", "10.10.1.2", "10.10.1.3"),
},
hostResponses: map[string][]activatortest.FakeResponse{
"10.10.1.1": {{
Err: errors.New("podIP transport error"),
}},
"10.10.1.2": {{
Code: http.StatusOK,
Body: queue.Name,
Delay: 250 * time.Millisecond,
}},
},
enableProbeOptimization: true,
},
expected: expected{
healthy: sets.NewString("10.10.1.2", "10.10.1.3"),
noop: false,
notMesh: true,
success: false,
numProbes: 3,
},
},
{
name: "ready pods skipped without mesh auto",
input: input{
current: dests{
ready: sets.NewString("10.10.1.1"),
notReady: sets.NewString("10.10.1.2"),
},
enableProbeOptimization: true,
meshMode: netcfg.MeshCompatibilityModeDisabled,
},
expected: expected{
healthy: sets.NewString("10.10.1.1", "10.10.1.2"),
noop: false,
notMesh: true,
success: true,
numProbes: 1,
},
},
{
name: "ready pods not skipped with mesh auto",
input: input{
current: dests{
ready: sets.NewString("10.10.1.1"),
notReady: sets.NewString("10.10.1.2"),
},
enableProbeOptimization: true,
meshMode: netcfg.MeshCompatibilityModeAuto,
},
expected: expected{
healthy: sets.NewString("10.10.1.1", "10.10.1.2"),
noop: false,
notMesh: true,
success: true,
numProbes: 2,
},
},
{
name: "only ready pods healthy without probe optimization", // NOTE: prior test is effectively this one with probe optimization
input: input{
current: dests{
ready: sets.NewString("10.10.1.1"),
notReady: sets.NewString("10.10.1.2"),
},
enableProbeOptimization: false,
meshMode: netcfg.MeshCompatibilityModeAuto,
},
expected: expected{
healthy: sets.NewString("10.10.1.1"),
noop: false,
notMesh: true,
success: true,
numProbes: 2,
},
},
{
name: "removes non-existent pods from healthy",
input: input{
current: dests{
ready: sets.NewString("10.10.1.1"),
notReady: sets.NewString("10.10.1.2"),
},
healthy: sets.NewString("10.10.1.1", "10.10.1.2", "10.10.1.3"),
enableProbeOptimization: true,
meshMode: netcfg.MeshCompatibilityModeDisabled,
},
expected: expected{
healthy: sets.NewString("10.10.1.1", "10.10.1.2"),
noop: false,
notMesh: false,
success: true,
numProbes: 0,
},
},
{
name: "non-probe additions count as changes", // Testing case where ready pods are added but probes do not add more
input: input{
current: dests{
ready: sets.NewString("10.10.1.1", "10.10.1.2"),
notReady: sets.NewString("10.10.1.3"),
},
healthy: sets.NewString("10.10.1.1"),
enableProbeOptimization: false,
meshMode: netcfg.MeshCompatibilityModeDisabled,
},
expected: expected{
healthy: sets.NewString("10.10.1.1", "10.10.1.2"),
noop: false,
notMesh: true,
success: true,
numProbes: 1,
},
},
{
name: "non-probe removals count as changes", // Testing case where non-existent pods are removed with no probe changes
input: input{
current: dests{
ready: sets.NewString("10.10.1.1"),
notReady: sets.NewString("10.10.1.3"),
},
healthy: sets.NewString("10.10.1.1", "10.10.1.2"),
enableProbeOptimization: false,
meshMode: netcfg.MeshCompatibilityModeDisabled,
},
expected: expected{
healthy: sets.NewString("10.10.1.1"),
noop: false,
notMesh: true,
success: true,
numProbes: 1,
},
},
{
name: "no changes with probes",
input: input{
current: dests{
ready: sets.NewString("10.10.1.1"),
notReady: sets.NewString("10.10.1.3"),
},
healthy: sets.NewString("10.10.1.1"),
enableProbeOptimization: false,
meshMode: netcfg.MeshCompatibilityModeDisabled,
},
expected: expected{
healthy: sets.NewString("10.10.1.1"),
noop: true,
notMesh: true,
success: true,
numProbes: 1,
},
},
{
name: "no changes without probes",
input: input{
current: dests{
ready: sets.NewString("10.10.1.1", "10.10.1.3"),
},
healthy: sets.NewString("10.10.1.1", "10.10.1.3"),
enableProbeOptimization: false,
meshMode: netcfg.MeshCompatibilityModeDisabled,
},
expected: expected{
healthy: sets.NewString("10.10.1.1", "10.10.1.3"),
noop: true,
notMesh: false,
success: true,
numProbes: 0,
},
},
{
name: "mesh probe error",
input: input{
current: dests{
ready: sets.NewString("10.10.1.1"),
notReady: sets.NewString("10.10.1.3"),
},
healthy: sets.NewString("10.10.1.1"),
enableProbeOptimization: true,
meshMode: netcfg.MeshCompatibilityModeAuto,
hostResponses: map[string][]activatortest.FakeResponse{
"10.10.1.3": {{
Code: http.StatusServiceUnavailable,
}},
},
},
expected: expected{
healthy: sets.NewString("10.10.1.1"),
noop: true,
notMesh: false,
success: false,
numProbes: 1,
},
},
}
if numProbes := fakeRT.NumProbes.Load(); numProbes != 0 {
t.Errorf("Unexpected number of probes, got %d, want %d", numProbes, 0)

fakeRT := activatortest.FakeRoundTripper{
ExpectHost: testRevision,
ProbeResponses: []activatortest.FakeResponse{{
Code: http.StatusOK,
Body: queue.Name,
}},
}

// Test one pod probe errors (using mesh compatibly auto and probe optimization), even if one
// pod errors all probes should still be completed
fakeRT.NumProbes.Store(0)
rw.healthyPods = sets.NewString()
healthy, noop, notMesh, err = rw.probePodIPs(cur.ready, cur.notReady)
if !healthy.Equal(cur.ready) {
t.Errorf("Healthy does not match, got %#v, want %#v diff: %s", healthy, cur.ready, cmp.Diff(healthy, cur.ready))
// Minimally constructed revisionWatcher just to have what is needed for probing
rw := &revisionWatcher{
rev: types.NamespacedName{Namespace: testNamespace, Name: testRevision},
logger: TestLogger(t),
transport: pkgnetwork.RoundTripperFunc(fakeRT.RT),
}
if noop || !notMesh || err == nil {
t.Errorf("Unexpected values: noop=%t (false), notMesh=%t (true), err=%s (non-nil)", noop, notMesh, err)

// Helper function to run the test and validate the results
testFunc := func(testName string, input input, expected expected) {
rw.enableProbeOptimisation = input.enableProbeOptimization
rw.meshMode = input.meshMode
rw.healthyPods = input.healthy

fakeRT.ProbeHostResponses = input.hostResponses
fakeRT.NumProbes.Store(0)

healthy, noop, notMesh, err := rw.probePodIPs(input.current.ready, input.current.notReady)
if !healthy.Equal(expected.healthy) {
t.Errorf("%s: Healthy does not match, got %v, want %v diff: %s",
testName, healthy, expected.healthy, cmp.Diff(healthy, expected.healthy))
}
if noop != expected.noop {
t.Errorf("%s: Unexpected value for noop, got %t, want %t", testName, noop, expected.noop)
}
if notMesh != expected.notMesh {
t.Errorf("%s: Unexpected value for notMesh, got %t, want %t",
testName, notMesh, expected.notMesh)
}
if err != nil && expected.success {
t.Errorf("%s: Unexpected error, got %v, want nil", testName, err)
} else if err == nil && !expected.success {
t.Errorf("%s: Unexpected error, got %v, want non-nil", testName, err)
}
if numProbes := fakeRT.NumProbes.Load(); numProbes != expected.numProbes {
t.Errorf("%s: Unexpected number of probes, got %d, want %d",
testName, numProbes, expected.numProbes)
}
}
if numProbes := fakeRT.NumProbes.Load(); numProbes != 3 {
t.Errorf("Unexpected number of probes, got %d, want %d", numProbes, 3)

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testFunc(t.Name(), test.input, test.expected)
})
}
}

0 comments on commit b2de4aa

Please sign in to comment.