From d3520ff4a12e86fd5acadb957c56a1623009fcc0 Mon Sep 17 00:00:00 2001 From: Martin Gencur Date: Wed, 14 Feb 2024 13:26:20 +0100 Subject: [PATCH] Wait for events with poll interval after finished event received (#7668) * Wait for events with poll interval after finished event received * Use PollUntilContextCancel * Change default timeout to 30 seconds * Call cancel * Simplify --- .../upgrade/prober/wathola/config/defaults.go | 3 ++- .../prober/wathola/config/defaults_test.go | 3 +++ .../prober/wathola/config/structure.go | 1 + test/upgrade/prober/wathola/event/services.go | 27 +++++++++++-------- 4 files changed, 22 insertions(+), 12 deletions(-) diff --git a/test/upgrade/prober/wathola/config/defaults.go b/test/upgrade/prober/wathola/config/defaults.go index 58f8751fce8..9fd5d6b8d16 100644 --- a/test/upgrade/prober/wathola/config/defaults.go +++ b/test/upgrade/prober/wathola/config/defaults.go @@ -56,7 +56,8 @@ func defaultValues() *Config { Receiver: ReceiverConfig{ Port: port, Teardown: ReceiverTeardownConfig{ - Duration: 3 * time.Second, + Duration: 60 * time.Second, + Interval: 1 * time.Second, }, Progress: ReceiverProgressConfig{ Duration: time.Second, diff --git a/test/upgrade/prober/wathola/config/defaults_test.go b/test/upgrade/prober/wathola/config/defaults_test.go index 00b4fd86e9e..a00eaf191f4 100644 --- a/test/upgrade/prober/wathola/config/defaults_test.go +++ b/test/upgrade/prober/wathola/config/defaults_test.go @@ -26,4 +26,7 @@ func TestDefaultValues(t *testing.T) { assert.Condition(t, func() (success bool) { return Instance.Receiver.Teardown.Duration.Seconds() >= 1 }) + assert.Condition(t, func() (success bool) { + return Instance.Receiver.Teardown.Interval.Seconds() >= 1 + }) } diff --git a/test/upgrade/prober/wathola/config/structure.go b/test/upgrade/prober/wathola/config/structure.go index b0fffa0e238..39ee37cbf40 100644 --- a/test/upgrade/prober/wathola/config/structure.go +++ b/test/upgrade/prober/wathola/config/structure.go @@ -23,6 +23,7 @@ import ( // ReceiverTeardownConfig holds config receiver teardown type ReceiverTeardownConfig struct { Duration time.Duration + Interval time.Duration } // ReceiverProgressConfig holds config receiver progress reporting diff --git a/test/upgrade/prober/wathola/event/services.go b/test/upgrade/prober/wathola/event/services.go index f7f055b4ce9..ed91bafb6b6 100644 --- a/test/upgrade/prober/wathola/event/services.go +++ b/test/upgrade/prober/wathola/event/services.go @@ -16,10 +16,12 @@ package event import ( + "context" "fmt" "sync" "time" + "k8s.io/apimachinery/pkg/util/wait" "knative.dev/eventing/test/upgrade/prober/wathola/config" ) @@ -104,23 +106,26 @@ func (f *finishedStore) RegisterFinished(finished *Finished) { f.eventsSent = finished.EventsSent f.totalRequests = finished.TotalRequests log.Infof("finish event received, expecting %d event ware propagated", finished.EventsSent) - d := config.Instance.Receiver.Teardown.Duration - log.Infof("waiting additional %v to be sure all events came", d) - time.Sleep(d) - receivedEvents := f.steps.Count() - - if receivedEvents != finished.EventsSent && - // If sending was interrupted, tolerate one more received - // event as there's no way to check if the last event is delivered or not. - !(finished.SendingInterrupted && receivedEvents == finished.EventsSent+1) { + timeout := config.Instance.Receiver.Teardown.Duration + interval := config.Instance.Receiver.Teardown.Interval + + log.Infof("waiting additional %v to be sure all events came", timeout) + + if err := wait.PollUntilContextTimeout(context.Background(), interval, timeout, true /*immediate*/, func(context.Context) (bool, error) { + return f.steps.Count() == finished.EventsSent || + // If sending was interrupted, tolerate one more received + // event as there's no way to check if the last event is delivered or not. + (finished.SendingInterrupted && f.steps.Count() == finished.EventsSent+1), nil + }); err != nil { f.errors.throwUnexpected("expecting to have %v unique events received, "+ - "but received %v unique events", finished.EventsSent, receivedEvents) + "but received %v unique events", finished.EventsSent, f.steps.Count()) f.reportViolations(finished) f.errors.state = Failed } else { - log.Infof("properly received %d unique events", receivedEvents) + log.Infof("properly received %d unique events", f.steps.Count()) f.errors.state = Success } + // check down time for _, unavailablePeriod := range finished.UnavailablePeriods { if unavailablePeriod.Period > config.Instance.Receiver.Errors.UnavailablePeriodToReport {