Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query-tee: fix behaviour of -proxy.compare-skip-recent-samples for long-running queries #9416

Merged
merged 2 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@
* [ENHANCEMENT] Optionally consider equivalent error messages the same when comparing responses. Enabled by default, disable with `-proxy.require-exact-error-match=true`. #9143 #9350 #9366
* [BUGFIX] Ensure any errors encountered while forwarding a request to a backend (eg. DNS resolution failures) are logged. #8419
* [BUGFIX] The comparison of the results should not fail when either side contains extra samples from within SkipRecentSamples duration. #8920
* [BUGFIX] When `-proxy.compare-skip-recent-samples` is enabled, compare sample timestamps with the time the query requests were made, rather than the time at which the comparison is occurring. #9416

### Documentation

Expand Down
25 changes: 13 additions & 12 deletions tools/querytee/proxy_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

type ResponsesComparator interface {
Compare(expected, actual []byte) (ComparisonResult, error)
Compare(expected, actual []byte, queryEvaluationTime time.Time) (ComparisonResult, error)
}

type ProxyEndpoint struct {
Expand Down Expand Up @@ -102,14 +102,15 @@ func (p *ProxyEndpoint) selectBackends() []ProxyBackendInterface {

func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, backends []ProxyBackendInterface, resCh chan *backendResponse) {
var (
wg = sync.WaitGroup{}
err error
body []byte
responses = make([]*backendResponse, 0, len(backends))
responsesMtx = sync.Mutex{}
timingMtx = sync.Mutex{}
query = req.URL.RawQuery
logger, ctx = spanlogger.NewWithLogger(req.Context(), p.logger, "Incoming proxied request")
wg = sync.WaitGroup{}
err error
body []byte
responses = make([]*backendResponse, 0, len(backends))
responsesMtx = sync.Mutex{}
timingMtx = sync.Mutex{}
query = req.URL.RawQuery
logger, ctx = spanlogger.NewWithLogger(req.Context(), p.logger, "Incoming proxied request")
evaluationTime = time.Now()
)

defer logger.Finish()
Expand Down Expand Up @@ -258,7 +259,7 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, backends []Pro
expectedResponse, actualResponse = actualResponse, expectedResponse
}

result, err := p.compareResponses(expectedResponse, actualResponse)
result, err := p.compareResponses(expectedResponse, actualResponse, evaluationTime)
if result == ComparisonFailed {
level.Error(logger).Log(
"msg", "response comparison failed",
Expand Down Expand Up @@ -314,7 +315,7 @@ func (p *ProxyEndpoint) waitBackendResponseForDownstream(resCh chan *backendResp
return firstResponse
}

func (p *ProxyEndpoint) compareResponses(expectedResponse, actualResponse *backendResponse) (ComparisonResult, error) {
func (p *ProxyEndpoint) compareResponses(expectedResponse, actualResponse *backendResponse, queryEvaluationTime time.Time) (ComparisonResult, error) {
if expectedResponse.err != nil {
return ComparisonFailed, fmt.Errorf("skipped comparison of response because the request to the preferred backend failed: %w", expectedResponse.err)
}
Expand All @@ -335,7 +336,7 @@ func (p *ProxyEndpoint) compareResponses(expectedResponse, actualResponse *backe
return ComparisonSkipped, fmt.Errorf("skipped comparison of response because the response from the secondary backend contained an unexpected content type '%s', expected 'application/json'", actualResponse.contentType)
}

return p.comparator.Compare(expectedResponse.body, actualResponse.body)
return p.comparator.Compare(expectedResponse.body, actualResponse.body, queryEvaluationTime)
}

type backendResponse struct {
Expand Down
2 changes: 1 addition & 1 deletion tools/querytee/proxy_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ type mockComparator struct {
comparisonError error
}

func (m *mockComparator) Compare(_, _ []byte) (ComparisonResult, error) {
func (m *mockComparator) Compare(_, _ []byte, _ time.Time) (ComparisonResult, error) {
return m.comparisonResult, m.comparisonError
}

Expand Down
2 changes: 1 addition & 1 deletion tools/querytee/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var testRoutes = []Route{

type testComparator struct{}

func (testComparator) Compare(_, _ []byte) (ComparisonResult, error) {
func (testComparator) Compare(_, _ []byte, _ time.Time) (ComparisonResult, error) {
return ComparisonSuccess, nil
}

Expand Down
103 changes: 59 additions & 44 deletions tools/querytee/response_comparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

// SamplesComparatorFunc helps with comparing different types of samples coming from /api/v1/query and /api/v1/query_range routes.
type SamplesComparatorFunc func(expected, actual json.RawMessage, opts SampleComparisonOptions) error
type SamplesComparatorFunc func(expected, actual json.RawMessage, queryEvaluationTime time.Time, opts SampleComparisonOptions) error

type SamplesResponse struct {
Status string
Expand Down Expand Up @@ -64,7 +64,7 @@ func (s *SamplesComparator) RegisterSamplesType(samplesType string, comparator S
s.sampleTypesComparator[samplesType] = comparator
}

func (s *SamplesComparator) Compare(expectedResponse, actualResponse []byte) (ComparisonResult, error) {
func (s *SamplesComparator) Compare(expectedResponse, actualResponse []byte, queryEvaluationTime time.Time) (ComparisonResult, error) {
var expected, actual SamplesResponse

err := json.Unmarshal(expectedResponse, &expected)
Expand Down Expand Up @@ -102,7 +102,7 @@ func (s *SamplesComparator) Compare(expectedResponse, actualResponse []byte) (Co
return ComparisonFailed, fmt.Errorf("resultType %s not registered for comparison", expected.Data.ResultType)
}

if err := comparator(expected.Data.Result, actual.Data.Result, s.opts); err != nil {
if err := comparator(expected.Data.Result, actual.Data.Result, queryEvaluationTime, s.opts); err != nil {
return ComparisonFailed, err
}

Expand Down Expand Up @@ -208,7 +208,7 @@ func formatAnnotationsForErrorMessage(warnings []string) string {
return "[" + strings.Join(formatted, ", ") + "]"
}

func compareMatrix(expectedRaw, actualRaw json.RawMessage, opts SampleComparisonOptions) error {
func compareMatrix(expectedRaw, actualRaw json.RawMessage, queryEvaluationTime time.Time, opts SampleComparisonOptions) error {
var expected, actual model.Matrix

err := json.Unmarshal(expectedRaw, &expected)
Expand All @@ -220,7 +220,7 @@ func compareMatrix(expectedRaw, actualRaw json.RawMessage, opts SampleComparison
return err
}

if allMatrixSamplesWithinRecentSampleWindow(expected, opts) && allMatrixSamplesWithinRecentSampleWindow(actual, opts) {
if allMatrixSamplesWithinRecentSampleWindow(expected, queryEvaluationTime, opts) && allMatrixSamplesWithinRecentSampleWindow(actual, queryEvaluationTime, opts) {
return nil
}

Expand All @@ -240,7 +240,7 @@ func compareMatrix(expectedRaw, actualRaw json.RawMessage, opts SampleComparison
}
actualMetric := actual[actualMetricIndex]

err := compareMatrixSamples(expectedMetric, actualMetric, opts)
err := compareMatrixSamples(expectedMetric, actualMetric, queryEvaluationTime, opts)
if err != nil {
return fmt.Errorf("%w\nExpected result for series:\n%v\n\nActual result for series:\n%v", err, expectedMetric, actualMetric)
}
Expand All @@ -249,9 +249,9 @@ func compareMatrix(expectedRaw, actualRaw json.RawMessage, opts SampleComparison
return nil
}

func compareMatrixSamples(expected, actual *model.SampleStream, opts SampleComparisonOptions) error {
func compareMatrixSamples(expected, actual *model.SampleStream, queryEvaluationTime time.Time, opts SampleComparisonOptions) error {
expectedSamplesTail, actualSamplesTail, err := comparePairs(expected.Values, actual.Values, func(p1 model.SamplePair, p2 model.SamplePair) error {
err := compareSamplePair(p1, p2, opts)
err := compareSamplePair(p1, p2, queryEvaluationTime, opts)
if err != nil {
return fmt.Errorf("float sample pair does not match for metric %s: %w", expected.Metric, err)
}
Expand All @@ -262,7 +262,7 @@ func compareMatrixSamples(expected, actual *model.SampleStream, opts SampleCompa
}

expectedHistogramSamplesTail, actualHistogramSamplesTail, err := comparePairs(expected.Histograms, actual.Histograms, func(p1 model.SampleHistogramPair, p2 model.SampleHistogramPair) error {
err := compareSampleHistogramPair(p1, p2, opts)
err := compareSampleHistogramPair(p1, p2, queryEvaluationTime, opts)
if err != nil {
return fmt.Errorf("histogram sample pair does not match for metric %s: %w", expected.Metric, err)
}
Expand All @@ -282,11 +282,11 @@ func compareMatrixSamples(expected, actual *model.SampleStream, opts SampleCompa
}

skipAllRecentFloatSamples := canSkipAllSamples(func(p model.SamplePair) bool {
return time.Since(p.Timestamp.Time())-opts.SkipRecentSamples < 0
return queryEvaluationTime.Sub(p.Timestamp.Time())-opts.SkipRecentSamples < 0
}, expectedSamplesTail, actualSamplesTail)

skipAllRecentHistogramSamples := canSkipAllSamples(func(p model.SampleHistogramPair) bool {
return time.Since(p.Timestamp.Time())-opts.SkipRecentSamples < 0
return queryEvaluationTime.Sub(p.Timestamp.Time())-opts.SkipRecentSamples < 0
}, expectedHistogramSamplesTail, actualHistogramSamplesTail)

if skipAllRecentFloatSamples && skipAllRecentHistogramSamples {
Expand Down Expand Up @@ -356,20 +356,20 @@ func canSkipAllSamples[S ~[]M, M any](skip func(M) bool, ss ...S) bool {
return true
}

func allMatrixSamplesWithinRecentSampleWindow(m model.Matrix, opts SampleComparisonOptions) bool {
func allMatrixSamplesWithinRecentSampleWindow(m model.Matrix, queryEvaluationTime time.Time, opts SampleComparisonOptions) bool {
if opts.SkipRecentSamples == 0 {
return false
}

for _, series := range m {
for _, sample := range series.Values {
if time.Since(sample.Timestamp.Time()) > opts.SkipRecentSamples {
if queryEvaluationTime.Sub(sample.Timestamp.Time()) > opts.SkipRecentSamples {
return false
}
}

for _, sample := range series.Histograms {
if time.Since(sample.Timestamp.Time()) > opts.SkipRecentSamples {
if queryEvaluationTime.Sub(sample.Timestamp.Time()) > opts.SkipRecentSamples {
return false
}
}
Expand All @@ -378,7 +378,7 @@ func allMatrixSamplesWithinRecentSampleWindow(m model.Matrix, opts SampleCompari
return true
}

func compareVector(expectedRaw, actualRaw json.RawMessage, opts SampleComparisonOptions) error {
func compareVector(expectedRaw, actualRaw json.RawMessage, queryEvaluationTime time.Time, opts SampleComparisonOptions) error {
var expected, actual model.Vector

err := json.Unmarshal(expectedRaw, &expected)
Expand All @@ -391,7 +391,7 @@ func compareVector(expectedRaw, actualRaw json.RawMessage, opts SampleComparison
return err
}

if allVectorSamplesWithinRecentSampleWindow(expected, opts) && allVectorSamplesWithinRecentSampleWindow(actual, opts) {
if allVectorSamplesWithinRecentSampleWindow(expected, queryEvaluationTime, opts) && allVectorSamplesWithinRecentSampleWindow(actual, queryEvaluationTime, opts) {
return nil
}

Expand All @@ -413,13 +413,18 @@ func compareVector(expectedRaw, actualRaw json.RawMessage, opts SampleComparison
actualMetric := actual[actualMetricIndex]

if expectedMetric.Histogram == nil && actualMetric.Histogram == nil {
err := compareSamplePair(model.SamplePair{
Timestamp: expectedMetric.Timestamp,
Value: expectedMetric.Value,
}, model.SamplePair{
Timestamp: actualMetric.Timestamp,
Value: actualMetric.Value,
}, opts)
err := compareSamplePair(
model.SamplePair{
Timestamp: expectedMetric.Timestamp,
Value: expectedMetric.Value,
},
model.SamplePair{
Timestamp: actualMetric.Timestamp,
Value: actualMetric.Value,
},
queryEvaluationTime,
opts,
)
if err != nil {
return fmt.Errorf("float sample pair does not match for metric %s: %w", expectedMetric.Metric, err)
}
Expand All @@ -428,13 +433,18 @@ func compareVector(expectedRaw, actualRaw json.RawMessage, opts SampleComparison
} else if expectedMetric.Histogram == nil && actualMetric.Histogram != nil {
return fmt.Errorf("sample pair does not match for metric %s: expected float value but got histogram", expectedMetric.Metric)
} else { // Expected value is a histogram and the actual value is a histogram.
err := compareSampleHistogramPair(model.SampleHistogramPair{
Timestamp: expectedMetric.Timestamp,
Histogram: expectedMetric.Histogram,
}, model.SampleHistogramPair{
Timestamp: actualMetric.Timestamp,
Histogram: actualMetric.Histogram,
}, opts)
err := compareSampleHistogramPair(
model.SampleHistogramPair{
Timestamp: expectedMetric.Timestamp,
Histogram: expectedMetric.Histogram,
},
model.SampleHistogramPair{
Timestamp: actualMetric.Timestamp,
Histogram: actualMetric.Histogram,
},
queryEvaluationTime,
opts,
)
if err != nil {
return fmt.Errorf("histogram sample pair does not match for metric %s: %w", expectedMetric.Metric, err)
}
Expand All @@ -444,21 +454,21 @@ func compareVector(expectedRaw, actualRaw json.RawMessage, opts SampleComparison
return nil
}

func allVectorSamplesWithinRecentSampleWindow(v model.Vector, opts SampleComparisonOptions) bool {
func allVectorSamplesWithinRecentSampleWindow(v model.Vector, queryEvaluationTime time.Time, opts SampleComparisonOptions) bool {
if opts.SkipRecentSamples == 0 {
return false
}

for _, sample := range v {
if time.Since(sample.Timestamp.Time()) > opts.SkipRecentSamples {
if queryEvaluationTime.Sub(sample.Timestamp.Time()) > opts.SkipRecentSamples {
return false
}
}

return true
}

func compareScalar(expectedRaw, actualRaw json.RawMessage, opts SampleComparisonOptions) error {
func compareScalar(expectedRaw, actualRaw json.RawMessage, queryEvaluationTime time.Time, opts SampleComparisonOptions) error {
var expected, actual model.Scalar
err := json.Unmarshal(expectedRaw, &expected)
if err != nil {
Expand All @@ -470,20 +480,25 @@ func compareScalar(expectedRaw, actualRaw json.RawMessage, opts SampleComparison
return err
}

return compareSamplePair(model.SamplePair{
Timestamp: expected.Timestamp,
Value: expected.Value,
}, model.SamplePair{
Timestamp: actual.Timestamp,
Value: actual.Value,
}, opts)
return compareSamplePair(
model.SamplePair{
Timestamp: expected.Timestamp,
Value: expected.Value,
},
model.SamplePair{
Timestamp: actual.Timestamp,
Value: actual.Value,
},
queryEvaluationTime,
opts,
)
}

func compareSamplePair(expected, actual model.SamplePair, opts SampleComparisonOptions) error {
func compareSamplePair(expected, actual model.SamplePair, queryEvaluationTime time.Time, opts SampleComparisonOptions) error {
if expected.Timestamp != actual.Timestamp {
return fmt.Errorf("expected timestamp %v but got %v", expected.Timestamp, actual.Timestamp)
}
if opts.SkipRecentSamples > 0 && time.Since(expected.Timestamp.Time()) < opts.SkipRecentSamples {
if opts.SkipRecentSamples > 0 && queryEvaluationTime.Sub(expected.Timestamp.Time()) < opts.SkipRecentSamples {
return nil
}
if !compareSampleValue(float64(expected.Value), float64(actual.Value), opts) {
Expand All @@ -507,12 +522,12 @@ func compareSampleValue(first, second float64, opts SampleComparisonOptions) boo
return math.Abs(first-second) <= opts.Tolerance
}

func compareSampleHistogramPair(expected, actual model.SampleHistogramPair, opts SampleComparisonOptions) error {
func compareSampleHistogramPair(expected, actual model.SampleHistogramPair, queryEvaluationTime time.Time, opts SampleComparisonOptions) error {
if expected.Timestamp != actual.Timestamp {
return fmt.Errorf("expected timestamp %v but got %v", expected.Timestamp, actual.Timestamp)
}

if opts.SkipRecentSamples > 0 && time.Since(expected.Timestamp.Time()) < opts.SkipRecentSamples {
if opts.SkipRecentSamples > 0 && queryEvaluationTime.Sub(expected.Timestamp.Time()) < opts.SkipRecentSamples {
return nil
}

Expand Down
Loading
Loading