Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

monitoring: use instrumented backoff as default for waiting #71

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions monitoring/instrumented.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func WithInstrumentedScheme(scheme string) InstrumentedOption {
}
}

// WithInstrumentedWaitBackoff allows customizing wait backoff when accessing metric endpoint.
// WithInstrumentedWaitBackoff allows customizing wait backoff when accessing or asserting on the metric endpoint.
func WithInstrumentedWaitBackoff(waitBackoff *backoff.Backoff) InstrumentedOption {
return func(o *rOpt) {
o.waitBackoff = waitBackoff
Expand Down Expand Up @@ -144,6 +144,17 @@ func (r *InstrumentedRunnable) Metrics() (_ string, err error) {
return string(body), err
}

func (r *InstrumentedRunnable) buildMetricsOptions(opts []MetricsOption) metricsOptions {
result := metricsOptions{
getValue: getMetricValue,
waitBackoff: r.waitBackoff,
}
for _, opt := range opts {
opt(&result)
}
return result
}

// WaitSumMetrics waits for at least one instance of each given metric names to be present and their sums,
// returning true when passed to given expected(...).
func (r *InstrumentedRunnable) WaitSumMetrics(expected MetricValueExpectation, metricNames ...string) error {
Expand All @@ -154,14 +165,13 @@ func (r *InstrumentedRunnable) WaitSumMetricsWithOptions(expected MetricValueExp
var (
sums []float64
err error
options = buildMetricsOptions(opts)
options = r.buildMetricsOptions(opts)
)

metricsWaitBackoff := backoff.New(context.Background(), *options.waitBackoff)
for metricsWaitBackoff.Reset(); metricsWaitBackoff.Ongoing(); {
for options.waitBackoff.Reset(); options.waitBackoff.Ongoing(); {
sums, err = r.SumMetrics(metricNames, opts...)
if options.waitMissingMetrics && errors.Is(err, errMissingMetric) {
metricsWaitBackoff.Wait()
options.waitBackoff.Wait()
continue
}
if err != nil {
Expand All @@ -171,15 +181,14 @@ func (r *InstrumentedRunnable) WaitSumMetricsWithOptions(expected MetricValueExp
if expected(sums...) {
return nil
}

metricsWaitBackoff.Wait()
options.waitBackoff.Wait()
}
return errors.Newf("unable to find metrics %s with expected values after %d retries. Last error: %v. Last values: %v", metricNames, metricsWaitBackoff.NumRetries(), err, sums)
return errors.Newf("unable to find metrics %s with expected values after %d retries. Last error: %v. Last values: %v", metricNames, options.waitBackoff.NumRetries(), err, sums)
}

// SumMetrics returns the sum of the values of each given metric names.
func (r *InstrumentedRunnable) SumMetrics(metricNames []string, opts ...MetricsOption) ([]float64, error) {
options := buildMetricsOptions(opts)
options := r.buildMetricsOptions(opts)
sums := make([]float64, len(metricNames))

metrics, err := r.Metrics()
Expand Down Expand Up @@ -224,9 +233,9 @@ func (r *InstrumentedRunnable) SumMetrics(metricNames []string, opts ...MetricsO

// WaitRemovedMetric waits until a metric disappear from the list of metrics exported by the service.
func (r *InstrumentedRunnable) WaitRemovedMetric(metricName string, opts ...MetricsOption) error {
options := buildMetricsOptions(opts)
options := r.buildMetricsOptions(opts)

for r.waitBackoff.Reset(); r.waitBackoff.Ongoing(); {
for options.waitBackoff.Reset(); options.waitBackoff.Ongoing(); {
// Fetch metrics.
metrics, err := r.Metrics()
if err != nil {
Expand All @@ -251,7 +260,7 @@ func (r *InstrumentedRunnable) WaitRemovedMetric(metricName string, opts ...Metr
return nil
}

r.waitBackoff.Wait()
options.waitBackoff.Wait()
}

return errors.Newf("the metric %s is still exported by %s", metricName, r.Name())
Expand Down
19 changes: 15 additions & 4 deletions monitoring/instrumented_composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ func (r *CompositeInstrumentedRunnable) MetricTargets() (ret []Target) {
return ret
}

func (r *CompositeInstrumentedRunnable) buildMetricsOptions(opts []MetricsOption) metricsOptions {
result := metricsOptions{
getValue: getMetricValue,
waitBackoff: r.backoff,
}
for _, opt := range opts {
opt(&result)
}
return result
}

// WaitSumMetrics waits for at least one instance of each given metric names to be present and their sums, returning true
// when passed to given expected(...).
func (r *CompositeInstrumentedRunnable) WaitSumMetrics(expected MetricValueExpectation, metricNames ...string) error {
Expand All @@ -51,13 +62,13 @@ func (r *CompositeInstrumentedRunnable) WaitSumMetricsWithOptions(expected Metri
var (
sums []float64
err error
options = buildMetricsOptions(opts)
options = r.buildMetricsOptions(opts)
)

for r.backoff.Reset(); r.backoff.Ongoing(); {
for options.waitBackoff.Reset(); options.waitBackoff.Ongoing(); {
sums, err = r.SumMetrics(metricNames, opts...)
if options.waitMissingMetrics && errors.Is(err, errMissingMetric) {
r.backoff.Wait()
options.waitBackoff.Wait()
continue
}
if err != nil {
Expand All @@ -68,7 +79,7 @@ func (r *CompositeInstrumentedRunnable) WaitSumMetricsWithOptions(expected Metri
return nil
}

r.backoff.Wait()
options.waitBackoff.Wait()
}

return errors.Wrapf(err, "unable to find metrics %s with expected values. Last values: %v", metricNames, sums)
Expand Down
19 changes: 19 additions & 0 deletions monitoring/instrumented_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
type runnableFake struct {
e2e.Runnable

running bool
endpoints map[string]string
}

Expand All @@ -30,6 +31,22 @@ func (r *runnableFake) Endpoint(portName string) string {
return r.endpoints[portName]
}

func (r *runnableFake) IsRunning() bool {
return r.running
}

func (r *runnableFake) SetMetadata(k, v any) {
}

func (r *runnableFake) Name() string {
return "fake"
}

func (r *runnableFake) Start() error {
r.running = true
return nil
}

func TestWaitSumMetric(t *testing.T) {
// Listen on a random port before starting the HTTP server, to
// make sure the port is already open when we'll call WaitSumMetric()
Expand Down Expand Up @@ -86,6 +103,7 @@ metric_b_summary_count 1
Max: 600 * time.Millisecond,
MaxRetries: 50,
})))
testutil.Ok(t, r.Start())

testutil.Ok(t, r.WaitSumMetrics(Equals(221), "metric_a"))

Expand Down Expand Up @@ -159,6 +177,7 @@ metric_b 1000
Max: 600 * time.Millisecond,
MaxRetries: 50,
})))
testutil.Ok(t, r.Start())

testutil.Ok(t, r.WaitSumMetrics(Equals(math.NaN()), "metric_a"))
}
21 changes: 3 additions & 18 deletions monitoring/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
package e2emon

import (
"context"
"math"
"time"

"github.com/efficientgo/core/backoff"
"github.com/efficientgo/e2e/monitoring/matchers"
Expand All @@ -24,13 +24,13 @@ type metricsOptions struct {
labelMatchers []*matchers.Matcher
waitMissingMetrics bool
skipMissingMetrics bool
waitBackoff *backoff.Config
waitBackoff *backoff.Backoff
}

// WithWaitBackoff is an option to configure a backoff when waiting on a metric value.
func WithWaitBackoff(backoffConfig *backoff.Config) MetricsOption {
return func(o *metricsOptions) {
o.waitBackoff = backoffConfig
o.waitBackoff = backoff.New(context.Background(), *backoffConfig)
Copy link
Collaborator

@yeya24 yeya24 Oct 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we allow passing ctx? Maybe we can add another function WithWaitBackoffCtx to avoid too many changes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets do this in a followup PR!

}
}

Expand Down Expand Up @@ -63,21 +63,6 @@ func SkipMissingMetrics() MetricsOption {
}
}

func buildMetricsOptions(opts []MetricsOption) metricsOptions {
result := metricsOptions{
getValue: getMetricValue,
waitBackoff: &backoff.Config{
Min: 300 * time.Millisecond,
Max: 600 * time.Millisecond,
MaxRetries: 50,
},
}
for _, opt := range opts {
opt(&result)
}
return result
}

func getMetricValue(m *io_prometheus_client.Metric) float64 {
if m.GetGauge() != nil {
return m.GetGauge().GetValue()
Expand Down
Loading