Skip to content

Commit

Permalink
fix(activator): Don't cancel all probes on one probe failure (#14303)
Browse files Browse the repository at this point in the history
* fix(activator): Don't cancel all probes on one probe failure

By using errgroup.WithContext, all probes are cancelled on the first
error returned.  This changes to use an errgroup without a
context/cancellation.  So all probes are allowed to run to completion
and one failed probe does not cause all probes to exit.

Ref #14200

* fix(activator): Update tests to include pod IP probe tests

- Add pod IP probe tests to directly test the pod IP probing behavior
  - Add test for no-probe optimization when all healthy
  - Add test for a pod returning an error to verify it does not block /
    cancel other probes (confirmed test with prior code which fails)
- Update the fake roundtripper to support the new pod IP probe tests by
  introducing the Delay field to be used to delay the response.  Default
  handling skips the delay if not set.
- Add comment to errgroup change since this had been correct before and
  was incorrectly changed. (Additionally change to use original form.)

* refactor(activator): Refactor roundtripper for test

Update to use a function for the delay to reduce duplicate code.
  • Loading branch information
arsenetar authored Sep 12, 2023
1 parent 6567552 commit 997d8ef
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 5 deletions.
6 changes: 4 additions & 2 deletions pkg/activator/net/revision_backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,9 @@ func (rw *revisionWatcher) probePodIPs(ready, notReady sets.String) (succeeded s
ctx, cancel := context.WithTimeout(context.Background(), probeTimeout)
defer cancel()

probeGroup, egCtx := errgroup.WithContext(ctx)
// Empty errgroup is used as cancellation on first error is not desired, all probes should be
// attempted even if one fails.
var probeGroup errgroup.Group
healthyDests := make(chan string, dests.Len())

var probed bool
Expand All @@ -264,7 +266,7 @@ func (rw *revisionWatcher) probePodIPs(ready, notReady sets.String) (succeeded s

dest := dest // Standard Go concurrency pattern.
probeGroup.Go(func() error {
ok, notMesh, err := rw.probe(egCtx, dest)
ok, notMesh, err := rw.probe(ctx, dest)
if ok && (ready.Has(dest) || rw.enableProbeOptimisation) {
healthyDests <- dest
}
Expand Down
67 changes: 67 additions & 0 deletions pkg/activator/net/revision_backends_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1519,3 +1519,70 @@ func TestServiceMoreThanOne(t *testing.T) {
case <-time.After(updateTimeout):
}
}

// More focused test around the probing of pods and the handling of different behaviors
func TestProbePodIPs(t *testing.T) {
fakeRT := activatortest.FakeRoundTripper{
ExpectHost: testRevision,
ProbeHostResponses: map[string][]activatortest.FakeResponse{
"10.10.1.1": {{
Code: http.StatusInternalServerError,
Err: errors.New("podIP transport error"),
}},
"10.10.1.2": {{
Code: http.StatusOK,
Body: queue.Name,
Delay: 250 * time.Millisecond,
}},
"10.10.1.3": {{
Code: http.StatusOK,
Body: queue.Name,
}},
},
}

// Minimally constructed revisionWatcher just to have what is needed for probing
rw := &revisionWatcher{
clusterIPHealthy: true,
podsAddressable: true,
rev: types.NamespacedName{Namespace: testNamespace, Name: testRevision},
logger: TestLogger(t),
transport: pkgnetwork.RoundTripperFunc(fakeRT.RT),
meshMode: netcfg.MeshCompatibilityModeAuto,
enableProbeOptimisation: true,
}

// Initial state for tests
cur := dests{
ready: sets.NewString("10.10.1.3", "10.10.1.2"),
notReady: sets.NewString("10.10.1.1"),
}

// Test all pods healthy (skips probes)
rw.healthyPods = cur.ready.Union(cur.notReady)
healthy, noop, notMesh, err := rw.probePodIPs(cur.ready, cur.notReady)
if !healthy.Equal(rw.healthyPods) {
t.Errorf("Healthy does not match, got %#v, want %#v diff: %s", healthy, rw.healthyPods, cmp.Diff(healthy, cur.ready))
}
if !noop || notMesh || err != nil {
t.Errorf("Unexpected values: noop=%t (true), notMesh=%t (false), err=%s (nil)", noop, notMesh, err)
}
if numProbes := fakeRT.NumProbes.Load(); numProbes != 0 {
t.Errorf("Unexpected number of probes, got %d, want %d", numProbes, 0)
}

// Test one pod probe errors (using mesh compatibly auto and probe optimization), even if one
// pod errors all probes should still be completed
fakeRT.NumProbes.Store(0)
rw.healthyPods = sets.NewString()
healthy, noop, notMesh, err = rw.probePodIPs(cur.ready, cur.notReady)
if !healthy.Equal(cur.ready) {
t.Errorf("Healthy does not match, got %#v, want %#v diff: %s", healthy, cur.ready, cmp.Diff(healthy, cur.ready))
}
if noop || !notMesh || err == nil {
t.Errorf("Unexpected values: noop=%t (false), notMesh=%t (true), err=%s (non-nil)", noop, notMesh, err)
}
if numProbes := fakeRT.NumProbes.Load(); numProbes != 3 {
t.Errorf("Unexpected number of probes, got %d, want %d", numProbes, 3)
}
}
36 changes: 33 additions & 3 deletions pkg/activator/testing/roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/http"
"net/http/httptest"
"sync"
"time"

"go.uber.org/atomic"

Expand All @@ -30,9 +31,10 @@ import (

// FakeResponse is a response given by the FakeRoundTripper
type FakeResponse struct {
Err error
Code int
Body string
Err error
Code int
Body string
Delay time.Duration
}

// FakeRoundTripper is a roundtripper emulator useful in testing
Expand Down Expand Up @@ -110,9 +112,32 @@ func (rt *FakeRoundTripper) popResponse(host string) *FakeResponse {

// RT is a RoundTripperFunc
func (rt *FakeRoundTripper) RT(req *http.Request) (*http.Response, error) {
ctx := req.Context()

delayResponse := func(resp *FakeResponse) error {
// Delay if set before sending response
if resp.Delay != 0 {
timer := time.NewTimer(resp.Delay)
select {
case <-ctx.Done():
timer.Stop()
return ctx.Err()
case <-timer.C:
}
}

return nil
}

if req.Header.Get(netheader.ProbeKey) != "" {
rt.NumProbes.Inc()
resp := rt.popResponse(req.URL.Host)

err := delayResponse(resp)
if err != nil {
return nil, err
}

if resp.Err != nil {
return nil, resp.Err
}
Expand All @@ -137,6 +162,11 @@ func (rt *FakeRoundTripper) RT(req *http.Request) (*http.Response, error) {
resp = defaultRequestResponse()
}

err := delayResponse(resp)
if err != nil {
return nil, err
}

if resp.Err != nil {
return nil, resp.Err
}
Expand Down

0 comments on commit 997d8ef

Please sign in to comment.