Skip to content

Commit

Permalink
query-tee: fix behaviour of -proxy.compare-skip-recent-samples for …
Browse files Browse the repository at this point in the history
…long-running queries (#9416)

* query-tee: fix behaviour of `-proxy.compare-skip-recent-samples` for long-running queries

* Add changelog entry
  • Loading branch information
charleskorn authored Sep 26, 2024
1 parent 9406e4f commit 3509c46
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 65 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@
* [ENHANCEMENT] Optionally consider equivalent error messages the same when comparing responses. Enabled by default, disable with `-proxy.require-exact-error-match=true`. #9143 #9350 #9366
* [BUGFIX] Ensure any errors encountered while forwarding a request to a backend (eg. DNS resolution failures) are logged. #8419
* [BUGFIX] The comparison of the results should not fail when either side contains extra samples from within SkipRecentSamples duration. #8920
* [BUGFIX] When `-proxy.compare-skip-recent-samples` is enabled, compare sample timestamps with the time the query requests were made, rather than the time at which the comparison is occurring. #9416

### Documentation

Expand Down
25 changes: 13 additions & 12 deletions tools/querytee/proxy_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

type ResponsesComparator interface {
Compare(expected, actual []byte) (ComparisonResult, error)
Compare(expected, actual []byte, queryEvaluationTime time.Time) (ComparisonResult, error)
}

type ProxyEndpoint struct {
Expand Down Expand Up @@ -102,14 +102,15 @@ func (p *ProxyEndpoint) selectBackends() []ProxyBackendInterface {

func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, backends []ProxyBackendInterface, resCh chan *backendResponse) {
var (
wg = sync.WaitGroup{}
err error
body []byte
responses = make([]*backendResponse, 0, len(backends))
responsesMtx = sync.Mutex{}
timingMtx = sync.Mutex{}
query = req.URL.RawQuery
logger, ctx = spanlogger.NewWithLogger(req.Context(), p.logger, "Incoming proxied request")
wg = sync.WaitGroup{}
err error
body []byte
responses = make([]*backendResponse, 0, len(backends))
responsesMtx = sync.Mutex{}
timingMtx = sync.Mutex{}
query = req.URL.RawQuery
logger, ctx = spanlogger.NewWithLogger(req.Context(), p.logger, "Incoming proxied request")
evaluationTime = time.Now()
)

defer logger.Finish()
Expand Down Expand Up @@ -258,7 +259,7 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, backends []Pro
expectedResponse, actualResponse = actualResponse, expectedResponse
}

result, err := p.compareResponses(expectedResponse, actualResponse)
result, err := p.compareResponses(expectedResponse, actualResponse, evaluationTime)
if result == ComparisonFailed {
level.Error(logger).Log(
"msg", "response comparison failed",
Expand Down Expand Up @@ -314,7 +315,7 @@ func (p *ProxyEndpoint) waitBackendResponseForDownstream(resCh chan *backendResp
return firstResponse
}

func (p *ProxyEndpoint) compareResponses(expectedResponse, actualResponse *backendResponse) (ComparisonResult, error) {
func (p *ProxyEndpoint) compareResponses(expectedResponse, actualResponse *backendResponse, queryEvaluationTime time.Time) (ComparisonResult, error) {
if expectedResponse.err != nil {
return ComparisonFailed, fmt.Errorf("skipped comparison of response because the request to the preferred backend failed: %w", expectedResponse.err)
}
Expand All @@ -335,7 +336,7 @@ func (p *ProxyEndpoint) compareResponses(expectedResponse, actualResponse *backe
return ComparisonSkipped, fmt.Errorf("skipped comparison of response because the response from the secondary backend contained an unexpected content type '%s', expected 'application/json'", actualResponse.contentType)
}

return p.comparator.Compare(expectedResponse.body, actualResponse.body)
return p.comparator.Compare(expectedResponse.body, actualResponse.body, queryEvaluationTime)
}

type backendResponse struct {
Expand Down
2 changes: 1 addition & 1 deletion tools/querytee/proxy_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ type mockComparator struct {
comparisonError error
}

func (m *mockComparator) Compare(_, _ []byte) (ComparisonResult, error) {
func (m *mockComparator) Compare(_, _ []byte, _ time.Time) (ComparisonResult, error) {
return m.comparisonResult, m.comparisonError
}

Expand Down
2 changes: 1 addition & 1 deletion tools/querytee/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var testRoutes = []Route{

type testComparator struct{}

func (testComparator) Compare(_, _ []byte) (ComparisonResult, error) {
func (testComparator) Compare(_, _ []byte, _ time.Time) (ComparisonResult, error) {
return ComparisonSuccess, nil
}

Expand Down
103 changes: 59 additions & 44 deletions tools/querytee/response_comparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

// SamplesComparatorFunc helps with comparing different types of samples coming from /api/v1/query and /api/v1/query_range routes.
type SamplesComparatorFunc func(expected, actual json.RawMessage, opts SampleComparisonOptions) error
type SamplesComparatorFunc func(expected, actual json.RawMessage, queryEvaluationTime time.Time, opts SampleComparisonOptions) error

type SamplesResponse struct {
Status string
Expand Down Expand Up @@ -64,7 +64,7 @@ func (s *SamplesComparator) RegisterSamplesType(samplesType string, comparator S
s.sampleTypesComparator[samplesType] = comparator
}

func (s *SamplesComparator) Compare(expectedResponse, actualResponse []byte) (ComparisonResult, error) {
func (s *SamplesComparator) Compare(expectedResponse, actualResponse []byte, queryEvaluationTime time.Time) (ComparisonResult, error) {
var expected, actual SamplesResponse

err := json.Unmarshal(expectedResponse, &expected)
Expand Down Expand Up @@ -102,7 +102,7 @@ func (s *SamplesComparator) Compare(expectedResponse, actualResponse []byte) (Co
return ComparisonFailed, fmt.Errorf("resultType %s not registered for comparison", expected.Data.ResultType)
}

if err := comparator(expected.Data.Result, actual.Data.Result, s.opts); err != nil {
if err := comparator(expected.Data.Result, actual.Data.Result, queryEvaluationTime, s.opts); err != nil {
return ComparisonFailed, err
}

Expand Down Expand Up @@ -208,7 +208,7 @@ func formatAnnotationsForErrorMessage(warnings []string) string {
return "[" + strings.Join(formatted, ", ") + "]"
}

func compareMatrix(expectedRaw, actualRaw json.RawMessage, opts SampleComparisonOptions) error {
func compareMatrix(expectedRaw, actualRaw json.RawMessage, queryEvaluationTime time.Time, opts SampleComparisonOptions) error {
var expected, actual model.Matrix

err := json.Unmarshal(expectedRaw, &expected)
Expand All @@ -220,7 +220,7 @@ func compareMatrix(expectedRaw, actualRaw json.RawMessage, opts SampleComparison
return err
}

if allMatrixSamplesWithinRecentSampleWindow(expected, opts) && allMatrixSamplesWithinRecentSampleWindow(actual, opts) {
if allMatrixSamplesWithinRecentSampleWindow(expected, queryEvaluationTime, opts) && allMatrixSamplesWithinRecentSampleWindow(actual, queryEvaluationTime, opts) {
return nil
}

Expand All @@ -240,7 +240,7 @@ func compareMatrix(expectedRaw, actualRaw json.RawMessage, opts SampleComparison
}
actualMetric := actual[actualMetricIndex]

err := compareMatrixSamples(expectedMetric, actualMetric, opts)
err := compareMatrixSamples(expectedMetric, actualMetric, queryEvaluationTime, opts)
if err != nil {
return fmt.Errorf("%w\nExpected result for series:\n%v\n\nActual result for series:\n%v", err, expectedMetric, actualMetric)
}
Expand All @@ -249,9 +249,9 @@ func compareMatrix(expectedRaw, actualRaw json.RawMessage, opts SampleComparison
return nil
}

func compareMatrixSamples(expected, actual *model.SampleStream, opts SampleComparisonOptions) error {
func compareMatrixSamples(expected, actual *model.SampleStream, queryEvaluationTime time.Time, opts SampleComparisonOptions) error {
expectedSamplesTail, actualSamplesTail, err := comparePairs(expected.Values, actual.Values, func(p1 model.SamplePair, p2 model.SamplePair) error {
err := compareSamplePair(p1, p2, opts)
err := compareSamplePair(p1, p2, queryEvaluationTime, opts)
if err != nil {
return fmt.Errorf("float sample pair does not match for metric %s: %w", expected.Metric, err)
}
Expand All @@ -262,7 +262,7 @@ func compareMatrixSamples(expected, actual *model.SampleStream, opts SampleCompa
}

expectedHistogramSamplesTail, actualHistogramSamplesTail, err := comparePairs(expected.Histograms, actual.Histograms, func(p1 model.SampleHistogramPair, p2 model.SampleHistogramPair) error {
err := compareSampleHistogramPair(p1, p2, opts)
err := compareSampleHistogramPair(p1, p2, queryEvaluationTime, opts)
if err != nil {
return fmt.Errorf("histogram sample pair does not match for metric %s: %w", expected.Metric, err)
}
Expand All @@ -282,11 +282,11 @@ func compareMatrixSamples(expected, actual *model.SampleStream, opts SampleCompa
}

skipAllRecentFloatSamples := canSkipAllSamples(func(p model.SamplePair) bool {
return time.Since(p.Timestamp.Time())-opts.SkipRecentSamples < 0
return queryEvaluationTime.Sub(p.Timestamp.Time())-opts.SkipRecentSamples < 0
}, expectedSamplesTail, actualSamplesTail)

skipAllRecentHistogramSamples := canSkipAllSamples(func(p model.SampleHistogramPair) bool {
return time.Since(p.Timestamp.Time())-opts.SkipRecentSamples < 0
return queryEvaluationTime.Sub(p.Timestamp.Time())-opts.SkipRecentSamples < 0
}, expectedHistogramSamplesTail, actualHistogramSamplesTail)

if skipAllRecentFloatSamples && skipAllRecentHistogramSamples {
Expand Down Expand Up @@ -356,20 +356,20 @@ func canSkipAllSamples[S ~[]M, M any](skip func(M) bool, ss ...S) bool {
return true
}

func allMatrixSamplesWithinRecentSampleWindow(m model.Matrix, opts SampleComparisonOptions) bool {
func allMatrixSamplesWithinRecentSampleWindow(m model.Matrix, queryEvaluationTime time.Time, opts SampleComparisonOptions) bool {
if opts.SkipRecentSamples == 0 {
return false
}

for _, series := range m {
for _, sample := range series.Values {
if time.Since(sample.Timestamp.Time()) > opts.SkipRecentSamples {
if queryEvaluationTime.Sub(sample.Timestamp.Time()) > opts.SkipRecentSamples {
return false
}
}

for _, sample := range series.Histograms {
if time.Since(sample.Timestamp.Time()) > opts.SkipRecentSamples {
if queryEvaluationTime.Sub(sample.Timestamp.Time()) > opts.SkipRecentSamples {
return false
}
}
Expand All @@ -378,7 +378,7 @@ func allMatrixSamplesWithinRecentSampleWindow(m model.Matrix, opts SampleCompari
return true
}

func compareVector(expectedRaw, actualRaw json.RawMessage, opts SampleComparisonOptions) error {
func compareVector(expectedRaw, actualRaw json.RawMessage, queryEvaluationTime time.Time, opts SampleComparisonOptions) error {
var expected, actual model.Vector

err := json.Unmarshal(expectedRaw, &expected)
Expand All @@ -391,7 +391,7 @@ func compareVector(expectedRaw, actualRaw json.RawMessage, opts SampleComparison
return err
}

if allVectorSamplesWithinRecentSampleWindow(expected, opts) && allVectorSamplesWithinRecentSampleWindow(actual, opts) {
if allVectorSamplesWithinRecentSampleWindow(expected, queryEvaluationTime, opts) && allVectorSamplesWithinRecentSampleWindow(actual, queryEvaluationTime, opts) {
return nil
}

Expand All @@ -413,13 +413,18 @@ func compareVector(expectedRaw, actualRaw json.RawMessage, opts SampleComparison
actualMetric := actual[actualMetricIndex]

if expectedMetric.Histogram == nil && actualMetric.Histogram == nil {
err := compareSamplePair(model.SamplePair{
Timestamp: expectedMetric.Timestamp,
Value: expectedMetric.Value,
}, model.SamplePair{
Timestamp: actualMetric.Timestamp,
Value: actualMetric.Value,
}, opts)
err := compareSamplePair(
model.SamplePair{
Timestamp: expectedMetric.Timestamp,
Value: expectedMetric.Value,
},
model.SamplePair{
Timestamp: actualMetric.Timestamp,
Value: actualMetric.Value,
},
queryEvaluationTime,
opts,
)
if err != nil {
return fmt.Errorf("float sample pair does not match for metric %s: %w", expectedMetric.Metric, err)
}
Expand All @@ -428,13 +433,18 @@ func compareVector(expectedRaw, actualRaw json.RawMessage, opts SampleComparison
} else if expectedMetric.Histogram == nil && actualMetric.Histogram != nil {
return fmt.Errorf("sample pair does not match for metric %s: expected float value but got histogram", expectedMetric.Metric)
} else { // Expected value is a histogram and the actual value is a histogram.
err := compareSampleHistogramPair(model.SampleHistogramPair{
Timestamp: expectedMetric.Timestamp,
Histogram: expectedMetric.Histogram,
}, model.SampleHistogramPair{
Timestamp: actualMetric.Timestamp,
Histogram: actualMetric.Histogram,
}, opts)
err := compareSampleHistogramPair(
model.SampleHistogramPair{
Timestamp: expectedMetric.Timestamp,
Histogram: expectedMetric.Histogram,
},
model.SampleHistogramPair{
Timestamp: actualMetric.Timestamp,
Histogram: actualMetric.Histogram,
},
queryEvaluationTime,
opts,
)
if err != nil {
return fmt.Errorf("histogram sample pair does not match for metric %s: %w", expectedMetric.Metric, err)
}
Expand All @@ -444,21 +454,21 @@ func compareVector(expectedRaw, actualRaw json.RawMessage, opts SampleComparison
return nil
}

func allVectorSamplesWithinRecentSampleWindow(v model.Vector, opts SampleComparisonOptions) bool {
func allVectorSamplesWithinRecentSampleWindow(v model.Vector, queryEvaluationTime time.Time, opts SampleComparisonOptions) bool {
if opts.SkipRecentSamples == 0 {
return false
}

for _, sample := range v {
if time.Since(sample.Timestamp.Time()) > opts.SkipRecentSamples {
if queryEvaluationTime.Sub(sample.Timestamp.Time()) > opts.SkipRecentSamples {
return false
}
}

return true
}

func compareScalar(expectedRaw, actualRaw json.RawMessage, opts SampleComparisonOptions) error {
func compareScalar(expectedRaw, actualRaw json.RawMessage, queryEvaluationTime time.Time, opts SampleComparisonOptions) error {
var expected, actual model.Scalar
err := json.Unmarshal(expectedRaw, &expected)
if err != nil {
Expand All @@ -470,20 +480,25 @@ func compareScalar(expectedRaw, actualRaw json.RawMessage, opts SampleComparison
return err
}

return compareSamplePair(model.SamplePair{
Timestamp: expected.Timestamp,
Value: expected.Value,
}, model.SamplePair{
Timestamp: actual.Timestamp,
Value: actual.Value,
}, opts)
return compareSamplePair(
model.SamplePair{
Timestamp: expected.Timestamp,
Value: expected.Value,
},
model.SamplePair{
Timestamp: actual.Timestamp,
Value: actual.Value,
},
queryEvaluationTime,
opts,
)
}

func compareSamplePair(expected, actual model.SamplePair, opts SampleComparisonOptions) error {
func compareSamplePair(expected, actual model.SamplePair, queryEvaluationTime time.Time, opts SampleComparisonOptions) error {
if expected.Timestamp != actual.Timestamp {
return fmt.Errorf("expected timestamp %v but got %v", expected.Timestamp, actual.Timestamp)
}
if opts.SkipRecentSamples > 0 && time.Since(expected.Timestamp.Time()) < opts.SkipRecentSamples {
if opts.SkipRecentSamples > 0 && queryEvaluationTime.Sub(expected.Timestamp.Time()) < opts.SkipRecentSamples {
return nil
}
if !compareSampleValue(float64(expected.Value), float64(actual.Value), opts) {
Expand All @@ -507,12 +522,12 @@ func compareSampleValue(first, second float64, opts SampleComparisonOptions) boo
return math.Abs(first-second) <= opts.Tolerance
}

func compareSampleHistogramPair(expected, actual model.SampleHistogramPair, opts SampleComparisonOptions) error {
func compareSampleHistogramPair(expected, actual model.SampleHistogramPair, queryEvaluationTime time.Time, opts SampleComparisonOptions) error {
if expected.Timestamp != actual.Timestamp {
return fmt.Errorf("expected timestamp %v but got %v", expected.Timestamp, actual.Timestamp)
}

if opts.SkipRecentSamples > 0 && time.Since(expected.Timestamp.Time()) < opts.SkipRecentSamples {
if opts.SkipRecentSamples > 0 && queryEvaluationTime.Sub(expected.Timestamp.Time()) < opts.SkipRecentSamples {
return nil
}

Expand Down
Loading

0 comments on commit 3509c46

Please sign in to comment.