From ac602c14aded9f26b59ed871f529b2c9de2643ea Mon Sep 17 00:00:00 2001 From: Andrew Senetar Date: Mon, 28 Aug 2023 20:25:59 +0000 Subject: [PATCH 1/3] fix(activator): Don't cancel all probes on one probe failure By using errgroup.WithContext, all probes are cancelled on the first error returned. This changes to use an errgroup without a context/cancellation. So all probes are allowed to run to completion and one failed probe does not cause all probes to exit. Ref #14200 --- pkg/activator/net/revision_backends.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/activator/net/revision_backends.go b/pkg/activator/net/revision_backends.go index c79e0ddc3ee0..726211b2d837 100644 --- a/pkg/activator/net/revision_backends.go +++ b/pkg/activator/net/revision_backends.go @@ -249,7 +249,7 @@ func (rw *revisionWatcher) probePodIPs(ready, notReady sets.String) (succeeded s ctx, cancel := context.WithTimeout(context.Background(), probeTimeout) defer cancel() - probeGroup, egCtx := errgroup.WithContext(ctx) + probeGroup := errgroup.Group{} healthyDests := make(chan string, dests.Len()) var probed bool @@ -264,7 +264,7 @@ func (rw *revisionWatcher) probePodIPs(ready, notReady sets.String) (succeeded s dest := dest // Standard Go concurrency pattern. probeGroup.Go(func() error { - ok, notMesh, err := rw.probe(egCtx, dest) + ok, notMesh, err := rw.probe(ctx, dest) if ok && (ready.Has(dest) || rw.enableProbeOptimisation) { healthyDests <- dest } From 278bfcc96927ed0f6eaf971cb52f465a8857d2df Mon Sep 17 00:00:00 2001 From: Andrew Senetar Date: Tue, 5 Sep 2023 15:09:13 +0000 Subject: [PATCH 2/3] fix(activator): Update tests to include pod IP probe tests - Add pod IP probe tests to directly test the pod IP probing behavior - Add test for no-probe optimization when all healthy - Add test for a pod returning an error to verify it does not block / cancel other probes (confirmed test with prior code which fails) - Update the fake roundtripper to support the new pod IP probe tests by introducing the Delay field to be used to delay the response. Default handling skips the delay if not set. - Add comment to errgroup change since this had been correct before and was incorrectly changed. (Additionally change to use original form.) --- pkg/activator/net/revision_backends.go | 4 +- pkg/activator/net/revision_backends_test.go | 67 +++++++++++++++++++++ pkg/activator/testing/roundtripper.go | 33 +++++++++- 3 files changed, 100 insertions(+), 4 deletions(-) diff --git a/pkg/activator/net/revision_backends.go b/pkg/activator/net/revision_backends.go index 726211b2d837..b2869d8a00de 100644 --- a/pkg/activator/net/revision_backends.go +++ b/pkg/activator/net/revision_backends.go @@ -249,7 +249,9 @@ func (rw *revisionWatcher) probePodIPs(ready, notReady sets.String) (succeeded s ctx, cancel := context.WithTimeout(context.Background(), probeTimeout) defer cancel() - probeGroup := errgroup.Group{} + // Empty errgroup is used as cancellation on first error is not desired, all probes should be + // attempted even if one fails. + var probeGroup errgroup.Group healthyDests := make(chan string, dests.Len()) var probed bool diff --git a/pkg/activator/net/revision_backends_test.go b/pkg/activator/net/revision_backends_test.go index a1959c4c5dae..193248e05a28 100644 --- a/pkg/activator/net/revision_backends_test.go +++ b/pkg/activator/net/revision_backends_test.go @@ -1519,3 +1519,70 @@ func TestServiceMoreThanOne(t *testing.T) { case <-time.After(updateTimeout): } } + +// More focused test around the probing of pods and the handling of different behaviors +func TestProbePodIPs(t *testing.T) { + fakeRT := activatortest.FakeRoundTripper{ + ExpectHost: testRevision, + ProbeHostResponses: map[string][]activatortest.FakeResponse{ + "10.10.1.1": {{ + Code: http.StatusInternalServerError, + Err: errors.New("podIP transport error"), + }}, + "10.10.1.2": {{ + Code: http.StatusOK, + Body: queue.Name, + Delay: 250 * time.Millisecond, + }}, + "10.10.1.3": {{ + Code: http.StatusOK, + Body: queue.Name, + }}, + }, + } + + // Minimally constructed revisionWatcher just to have what is needed for probing + rw := &revisionWatcher{ + clusterIPHealthy: true, + podsAddressable: true, + rev: types.NamespacedName{Namespace: testNamespace, Name: testRevision}, + logger: TestLogger(t), + transport: pkgnetwork.RoundTripperFunc(fakeRT.RT), + meshMode: netcfg.MeshCompatibilityModeAuto, + enableProbeOptimisation: true, + } + + // Initial state for tests + cur := dests{ + ready: sets.NewString("10.10.1.3", "10.10.1.2"), + notReady: sets.NewString("10.10.1.1"), + } + + // Test all pods healthy (skips probes) + rw.healthyPods = cur.ready.Union(cur.notReady) + healthy, noop, notMesh, err := rw.probePodIPs(cur.ready, cur.notReady) + if !healthy.Equal(rw.healthyPods) { + t.Errorf("Healthy does not match, got %#v, want %#v diff: %s", healthy, rw.healthyPods, cmp.Diff(healthy, cur.ready)) + } + if !noop || notMesh || err != nil { + t.Errorf("Unexpected values: noop=%t (true), notMesh=%t (false), err=%s (nil)", noop, notMesh, err) + } + if numProbes := fakeRT.NumProbes.Load(); numProbes != 0 { + t.Errorf("Unexpected number of probes, got %d, want %d", numProbes, 0) + } + + // Test one pod probe errors (using mesh compatibly auto and probe optimization), even if one + // pod errors all probes should still be completed + fakeRT.NumProbes.Store(0) + rw.healthyPods = sets.NewString() + healthy, noop, notMesh, err = rw.probePodIPs(cur.ready, cur.notReady) + if !healthy.Equal(cur.ready) { + t.Errorf("Healthy does not match, got %#v, want %#v diff: %s", healthy, cur.ready, cmp.Diff(healthy, cur.ready)) + } + if noop || !notMesh || err == nil { + t.Errorf("Unexpected values: noop=%t (false), notMesh=%t (true), err=%s (non-nil)", noop, notMesh, err) + } + if numProbes := fakeRT.NumProbes.Load(); numProbes != 3 { + t.Errorf("Unexpected number of probes, got %d, want %d", numProbes, 3) + } +} diff --git a/pkg/activator/testing/roundtripper.go b/pkg/activator/testing/roundtripper.go index 0ba7763fb0f5..38ac93704db8 100644 --- a/pkg/activator/testing/roundtripper.go +++ b/pkg/activator/testing/roundtripper.go @@ -21,6 +21,7 @@ import ( "net/http" "net/http/httptest" "sync" + "time" "go.uber.org/atomic" @@ -30,9 +31,10 @@ import ( // FakeResponse is a response given by the FakeRoundTripper type FakeResponse struct { - Err error - Code int - Body string + Err error + Code int + Body string + Delay time.Duration } // FakeRoundTripper is a roundtripper emulator useful in testing @@ -110,9 +112,23 @@ func (rt *FakeRoundTripper) popResponse(host string) *FakeResponse { // RT is a RoundTripperFunc func (rt *FakeRoundTripper) RT(req *http.Request) (*http.Response, error) { + ctx := req.Context() + if req.Header.Get(netheader.ProbeKey) != "" { rt.NumProbes.Inc() resp := rt.popResponse(req.URL.Host) + + // Delay if set before sending response + if resp.Delay.Seconds() != 0 { + timer := time.NewTimer(resp.Delay) + select { + case <-ctx.Done(): + timer.Stop() + return nil, ctx.Err() + case <-timer.C: + } + } + if resp.Err != nil { return nil, resp.Err } @@ -137,6 +153,17 @@ func (rt *FakeRoundTripper) RT(req *http.Request) (*http.Response, error) { resp = defaultRequestResponse() } + // Delay if set before sending response + if resp.Delay.Seconds() != 0 { + timer := time.NewTimer(resp.Delay) + select { + case <-ctx.Done(): + timer.Stop() + return nil, ctx.Err() + case <-timer.C: + } + } + if resp.Err != nil { return nil, resp.Err } From ba249c9665ebbed055cf8deea32ec20072973d5f Mon Sep 17 00:00:00 2001 From: Andrew Senetar Date: Mon, 11 Sep 2023 16:05:20 +0000 Subject: [PATCH 3/3] refactor(activator): Refactor roundtripper for test Update to use a function for the delay to reduce duplicate code. --- pkg/activator/testing/roundtripper.go | 33 +++++++++++++++------------ 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/pkg/activator/testing/roundtripper.go b/pkg/activator/testing/roundtripper.go index 38ac93704db8..8a4c05a8005d 100644 --- a/pkg/activator/testing/roundtripper.go +++ b/pkg/activator/testing/roundtripper.go @@ -114,21 +114,30 @@ func (rt *FakeRoundTripper) popResponse(host string) *FakeResponse { func (rt *FakeRoundTripper) RT(req *http.Request) (*http.Response, error) { ctx := req.Context() - if req.Header.Get(netheader.ProbeKey) != "" { - rt.NumProbes.Inc() - resp := rt.popResponse(req.URL.Host) - + delayResponse := func(resp *FakeResponse) error { // Delay if set before sending response - if resp.Delay.Seconds() != 0 { + if resp.Delay != 0 { timer := time.NewTimer(resp.Delay) select { case <-ctx.Done(): timer.Stop() - return nil, ctx.Err() + return ctx.Err() case <-timer.C: } } + return nil + } + + if req.Header.Get(netheader.ProbeKey) != "" { + rt.NumProbes.Inc() + resp := rt.popResponse(req.URL.Host) + + err := delayResponse(resp) + if err != nil { + return nil, err + } + if resp.Err != nil { return nil, resp.Err } @@ -153,15 +162,9 @@ func (rt *FakeRoundTripper) RT(req *http.Request) (*http.Response, error) { resp = defaultRequestResponse() } - // Delay if set before sending response - if resp.Delay.Seconds() != 0 { - timer := time.NewTimer(resp.Delay) - select { - case <-ctx.Done(): - timer.Stop() - return nil, ctx.Err() - case <-timer.C: - } + err := delayResponse(resp) + if err != nil { + return nil, err } if resp.Err != nil {