Skip to content

Commit

Permalink
[release-v1.12] Wait for events with poll interval after finished eve…
Browse files Browse the repository at this point in the history
…nt received (knative#7668) (#580)

* Wait for events with poll interval after finished event received (knative#7668)

* Wait for events with poll interval after finished event received

* Use PollUntilContextCancel

* Change default timeout to 30 seconds

* Call cancel

* Simplify

* wathola receiver, annotate span by step number (knative#7667)

wathole receiver, annotate span by step number

---------

Co-authored-by: Martin Gencur <mgencur@redhat.com>
Co-authored-by: Marek Schmidt <maschmid@redhat.com>
  • Loading branch information
3 people authored Apr 2, 2024
1 parent edb0e10 commit cbc4ca3
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 12 deletions.
3 changes: 2 additions & 1 deletion test/upgrade/prober/wathola/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func defaultValues() *Config {
Receiver: ReceiverConfig{
Port: port,
Teardown: ReceiverTeardownConfig{
Duration: 3 * time.Second,
Duration: 60 * time.Second,
Interval: 1 * time.Second,
},
Progress: ReceiverProgressConfig{
Duration: time.Second,
Expand Down
3 changes: 3 additions & 0 deletions test/upgrade/prober/wathola/config/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@ func TestDefaultValues(t *testing.T) {
assert.Condition(t, func() (success bool) {
return Instance.Receiver.Teardown.Duration.Seconds() >= 1
})
assert.Condition(t, func() (success bool) {
return Instance.Receiver.Teardown.Interval.Seconds() >= 1
})
}
1 change: 1 addition & 0 deletions test/upgrade/prober/wathola/config/structure.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
// ReceiverTeardownConfig holds config receiver teardown
type ReceiverTeardownConfig struct {
Duration time.Duration
Interval time.Duration
}

// ReceiverProgressConfig holds config receiver progress reporting
Expand Down
27 changes: 16 additions & 11 deletions test/upgrade/prober/wathola/event/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
package event

import (
"context"
"fmt"
"sync"
"time"

"k8s.io/apimachinery/pkg/util/wait"
"knative.dev/eventing/test/upgrade/prober/wathola/config"
)

Expand Down Expand Up @@ -104,23 +106,26 @@ func (f *finishedStore) RegisterFinished(finished *Finished) {
f.eventsSent = finished.EventsSent
f.totalRequests = finished.TotalRequests
log.Infof("finish event received, expecting %d event ware propagated", finished.EventsSent)
d := config.Instance.Receiver.Teardown.Duration
log.Infof("waiting additional %v to be sure all events came", d)
time.Sleep(d)
receivedEvents := f.steps.Count()

if receivedEvents != finished.EventsSent &&
// If sending was interrupted, tolerate one more received
// event as there's no way to check if the last event is delivered or not.
!(finished.SendingInterrupted && receivedEvents == finished.EventsSent+1) {
timeout := config.Instance.Receiver.Teardown.Duration
interval := config.Instance.Receiver.Teardown.Interval

log.Infof("waiting additional %v to be sure all events came", timeout)

if err := wait.PollUntilContextTimeout(context.Background(), interval, timeout, true /*immediate*/, func(context.Context) (bool, error) {
return f.steps.Count() == finished.EventsSent ||
// If sending was interrupted, tolerate one more received
// event as there's no way to check if the last event is delivered or not.
(finished.SendingInterrupted && f.steps.Count() == finished.EventsSent+1), nil
}); err != nil {
f.errors.throwUnexpected("expecting to have %v unique events received, "+
"but received %v unique events", finished.EventsSent, receivedEvents)
"but received %v unique events", finished.EventsSent, f.steps.Count())
f.reportViolations(finished)
f.errors.state = Failed
} else {
log.Infof("properly received %d unique events", receivedEvents)
log.Infof("properly received %d unique events", f.steps.Count())
f.errors.state = Success
}

// check down time
for _, unavailablePeriod := range finished.UnavailablePeriods {
if unavailablePeriod.Period > config.Instance.Receiver.Errors.UnavailablePeriodToReport {
Expand Down
1 change: 1 addition & 0 deletions test/upgrade/prober/wathola/receiver/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (r receiver) receiveEvent(ctx context.Context, e cloudevents.Event) {
if err != nil {
log.Fatal(err)
}
span.AddAttributes(trace.Int64Attribute("step", int64(step.Number)))
r.step.RegisterStep(step)
}
if t == event.FinishedType {
Expand Down

0 comments on commit cbc4ca3

Please sign in to comment.