diff --git a/pkg/streamingpromql/benchmarks/benchmarks.go b/pkg/streamingpromql/benchmarks/benchmarks.go index 597c2704b37..b49133cdec8 100644 --- a/pkg/streamingpromql/benchmarks/benchmarks.go +++ b/pkg/streamingpromql/benchmarks/benchmarks.go @@ -158,6 +158,9 @@ func TestCases(metricSizes []int) []BenchCase { { Expr: "nh_X / a_X", }, + { + Expr: "a_X == b_X", + }, { Expr: "2 * a_X", }, diff --git a/pkg/streamingpromql/operators/binops/binary_operation.go b/pkg/streamingpromql/operators/binops/binary_operation.go index b499ad4f387..3bc3a82e2f6 100644 --- a/pkg/streamingpromql/operators/binops/binary_operation.go +++ b/pkg/streamingpromql/operators/binops/binary_operation.go @@ -5,9 +5,11 @@ package binops import ( "fmt" "slices" + "time" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/promql/parser/posrange" @@ -82,6 +84,43 @@ func groupLabelsFunc(vectorMatching parser.VectorMatching, op parser.ItemType, r } } +func formatConflictError( + firstConflictingSeriesIndex int, + secondConflictingSeriesIndex int, + description string, + ts int64, + sourceSeriesMetadata []types.SeriesMetadata, + side string, + vectorMatching parser.VectorMatching, + op parser.ItemType, + returnBool bool, +) error { + firstConflictingSeriesLabels := sourceSeriesMetadata[firstConflictingSeriesIndex].Labels + groupLabels := groupLabelsFunc(vectorMatching, op, returnBool)(firstConflictingSeriesLabels) + + if secondConflictingSeriesIndex == -1 { + return fmt.Errorf( + "found %s for the match group %s on the %s side of the operation at timestamp %s", + description, + groupLabels, + side, + timestamp.Time(ts).Format(time.RFC3339Nano), + ) + } + + secondConflictingSeriesLabels := sourceSeriesMetadata[secondConflictingSeriesIndex].Labels + + return fmt.Errorf( + "found %s for the match group %s on the %s side of the operation at timestamp %s: %s and %s", + description, + groupLabels, + side, + timestamp.Time(ts).Format(time.RFC3339Nano), + firstConflictingSeriesLabels, + secondConflictingSeriesLabels, + ) +} + // filterSeries returns data filtered based on the mask provided. // // mask is expected to contain one value for each time step in the query time range. diff --git a/pkg/streamingpromql/operators/binops/grouped_vector_vector_binary_operation.go b/pkg/streamingpromql/operators/binops/grouped_vector_vector_binary_operation.go index 206044c3051..2bdda7d501a 100644 --- a/pkg/streamingpromql/operators/binops/grouped_vector_vector_binary_operation.go +++ b/pkg/streamingpromql/operators/binops/grouped_vector_vector_binary_operation.go @@ -11,10 +11,8 @@ import ( "fmt" "slices" "sort" - "time" "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/util/annotations" @@ -618,13 +616,13 @@ func (g *GroupedVectorVectorBinaryOperation) updateOneSidePresence(side *oneSide for _, p := range seriesData.Floats { if otherSeriesIdx := matchGroup.updatePresence(g.timeRange.PointIndex(p.T), seriesIdx); otherSeriesIdx != -1 { - return g.formatConflictError(otherSeriesIdx, seriesIdx, "duplicate series", p.T, g.oneSideMetadata, g.oneSideHandedness()) + return formatConflictError(otherSeriesIdx, seriesIdx, "duplicate series", p.T, g.oneSideMetadata, g.oneSideHandedness(), g.VectorMatching, g.Op, g.ReturnBool) } } for _, p := range seriesData.Histograms { if otherSeriesIdx := matchGroup.updatePresence(g.timeRange.PointIndex(p.T), seriesIdx); otherSeriesIdx != -1 { - return g.formatConflictError(otherSeriesIdx, seriesIdx, "duplicate series", p.T, g.oneSideMetadata, g.oneSideHandedness()) + return formatConflictError(otherSeriesIdx, seriesIdx, "duplicate series", p.T, g.oneSideMetadata, g.oneSideHandedness(), g.VectorMatching, g.Op, g.ReturnBool) } } } @@ -646,7 +644,8 @@ func (g *GroupedVectorVectorBinaryOperation) mergeOneSide(data []types.InstantVe } if conflict != nil { - return types.InstantVectorSeriesData{}, g.formatConflictError(conflict.FirstConflictingSeriesIndex, conflict.SecondConflictingSeriesIndex, conflict.Description, conflict.Timestamp, g.oneSideMetadata, g.oneSideHandedness()) + err := formatConflictError(conflict.FirstConflictingSeriesIndex, conflict.SecondConflictingSeriesIndex, conflict.Description, conflict.Timestamp, g.oneSideMetadata, g.oneSideHandedness(), g.VectorMatching, g.Op, g.ReturnBool) + return types.InstantVectorSeriesData{}, err } return merged, nil @@ -689,40 +688,6 @@ func (g *GroupedVectorVectorBinaryOperation) mergeManySide(data []types.InstantV return merged, nil } -func (g *GroupedVectorVectorBinaryOperation) formatConflictError( - firstConflictingSeriesIndex int, - secondConflictingSeriesIndex int, - description string, - ts int64, - sourceSeriesMetadata []types.SeriesMetadata, - side string, -) error { - firstConflictingSeriesLabels := sourceSeriesMetadata[firstConflictingSeriesIndex].Labels - groupLabels := groupLabelsFunc(g.VectorMatching, g.Op, g.ReturnBool)(firstConflictingSeriesLabels) - - if secondConflictingSeriesIndex == -1 { - return fmt.Errorf( - "found %s for the match group %s on the %s side of the operation at timestamp %s", - description, - groupLabels, - side, - timestamp.Time(ts).Format(time.RFC3339Nano), - ) - } - - secondConflictingSeriesLabels := sourceSeriesMetadata[secondConflictingSeriesIndex].Labels - - return fmt.Errorf( - "found %s for the match group %s on the %s side of the operation at timestamp %s: %s and %s", - description, - groupLabels, - side, - timestamp.Time(ts).Format(time.RFC3339Nano), - firstConflictingSeriesLabels, - secondConflictingSeriesLabels, - ) -} - func (g *GroupedVectorVectorBinaryOperation) oneSideHandedness() string { switch g.VectorMatching.Card { case parser.CardOneToMany: diff --git a/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation.go b/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation.go index 70f1204d7bf..588cdfbf98c 100644 --- a/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation.go +++ b/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation.go @@ -7,13 +7,11 @@ package binops import ( "context" - "fmt" "math" "sort" - "time" "github.com/prometheus/prometheus/model/histogram" - "github.com/prometheus/prometheus/model/timestamp" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/util/annotations" @@ -48,13 +46,14 @@ type OneToOneVectorVectorBinaryOperation struct { expressionPosition posrange.PositionRange annotations *annotations.Annotations + timeRange types.QueryTimeRange } var _ types.InstantVectorOperator = &OneToOneVectorVectorBinaryOperation{} type oneToOneBinaryOperationOutputSeries struct { - leftSeriesIndices []int - rightSeriesIndices []int + leftSeriesIndices []int + rightSide *oneToOneBinaryOperationRightSide } // latestLeftSeries returns the index of the last series from the left source needed for this output series. @@ -66,9 +65,43 @@ func (s oneToOneBinaryOperationOutputSeries) latestLeftSeries() int { // latestRightSeries returns the index of the last series from the right source needed for this output series. // -// It assumes that rightSeriesIndices is sorted in ascending order. +// It assumes that rightSide.rightSeriesIndices is sorted in ascending order. func (s oneToOneBinaryOperationOutputSeries) latestRightSeries() int { - return s.rightSeriesIndices[len(s.rightSeriesIndices)-1] + return s.rightSide.rightSeriesIndices[len(s.rightSide.rightSeriesIndices)-1] +} + +type oneToOneBinaryOperationRightSide struct { + // If this right side is used for multiple output series and has not been populated, rightSeriesIndices will not be nil. + // If this right side has been populated, rightSeriesIndices will be nil. + rightSeriesIndices []int + mergedData types.InstantVectorSeriesData + + // The number of output series that use the same series from the right side. + // Will only be greater than 1 for comparison binary operations without the bool modifier + // where the input series on the left side have different metric names. + outputSeriesCount int + + // Time steps at which we've seen samples for any left side that matches with this right side. + // Each value is the index of the source series of the sample, or -1 if no sample has been seen for this time step yet. + leftSidePresence []int +} + +// updatePresence records the presence of a sample from the left side series with index seriesIdx at the timestamp with index timestampIdx. +// +// If there is already a sample present from another series at the same timestamp, updatePresence returns that series' index, or +// -1 if there was no sample present at the same timestamp from another series. +func (g *oneToOneBinaryOperationRightSide) updatePresence(timestampIdx int64, seriesIdx int) int { + if existing := g.leftSidePresence[timestampIdx]; existing != -1 { + return existing + } + + g.leftSidePresence[timestampIdx] = seriesIdx + return -1 +} + +type oneToOneBinaryOperationOutputSeriesWithLabels struct { + labels labels.Labels + series *oneToOneBinaryOperationOutputSeries } func NewOneToOneVectorVectorBinaryOperation( @@ -80,6 +113,7 @@ func NewOneToOneVectorVectorBinaryOperation( memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, + timeRange types.QueryTimeRange, ) (*OneToOneVectorVectorBinaryOperation, error) { e, err := newVectorVectorBinaryOperationEvaluator(op, returnBool, memoryConsumptionTracker, annotations, expressionPosition) if err != nil { @@ -97,6 +131,7 @@ func NewOneToOneVectorVectorBinaryOperation( evaluator: e, expressionPosition: expressionPosition, annotations: annotations, + timeRange: timeRange, } return b, nil @@ -182,91 +217,124 @@ func (b *OneToOneVectorVectorBinaryOperation) loadSeriesMetadata(ctx context.Con // - a list indicating which series from the left side are needed to compute the output // - a list indicating which series from the right side are needed to compute the output func (b *OneToOneVectorVectorBinaryOperation) computeOutputSeries() ([]types.SeriesMetadata, []*oneToOneBinaryOperationOutputSeries, []bool, []bool, error) { - labelsFunc := groupLabelsFunc(b.VectorMatching, b.Op, b.ReturnBool) groupKeyFunc := vectorMatchingGroupKeyFunc(b.VectorMatching) - outputSeriesMap := map[string]*oneToOneBinaryOperationOutputSeries{} - // Use the smaller side to populate the map of possible output series first. - // This should ensure we don't unnecessarily populate the output series map with series that will never match in most cases. - // (It's possible that all the series on the larger side all belong to the same group, but this is expected to be rare.) - smallerSide := b.leftMetadata - largerSide := b.rightMetadata - smallerSideIsLeftSide := len(b.leftMetadata) < len(b.rightMetadata) + // If the left side is smaller than the right, build a map of the possible groups from the left side + // to allow us to avoid creating unnecessary groups when iterating through the right side in computeRightSideGroups. + // This optimisation assumes that most series on either side match at most one series on the other side, + // which is generally true for one-to-one matching. + // FIXME: a possible improvement would be to only bother with this if the left side is significantly smaller + var leftSideGroupsMap map[string]struct{} - if !smallerSideIsLeftSide { - smallerSide = b.rightMetadata - largerSide = b.leftMetadata + if len(b.leftMetadata) < len(b.rightMetadata) { + leftSideGroupsMap = b.computeLeftSideGroups(groupKeyFunc) } - for idx, s := range smallerSide { - groupKey := groupKeyFunc(s.Labels) - series, exists := outputSeriesMap[string(groupKey)] // Important: don't extract the string(...) call here - passing it directly allows us to avoid allocating it. + rightSideGroupsMap := b.computeRightSideGroups(leftSideGroupsMap, groupKeyFunc) - if !exists { - series = &oneToOneBinaryOperationOutputSeries{} - outputSeriesMap[string(groupKey)] = series - } + outputSeriesMap := map[string]oneToOneBinaryOperationOutputSeriesWithLabels{} - if smallerSideIsLeftSide { - series.leftSeriesIndices = append(series.leftSeriesIndices, idx) - } else { - series.rightSeriesIndices = append(series.rightSeriesIndices, idx) - } + leftSeriesUsed, err := types.BoolSlicePool.Get(len(b.leftMetadata), b.MemoryConsumptionTracker) + if err != nil { + return nil, nil, nil, nil, err } - for idx, s := range largerSide { - groupKey := groupKeyFunc(s.Labels) + rightSeriesUsed, err := types.BoolSlicePool.Get(len(b.rightMetadata), b.MemoryConsumptionTracker) + if err != nil { + return nil, nil, nil, nil, err + } - // Important: don't extract the string(...) call below - passing it directly allows us to avoid allocating it. - if series, exists := outputSeriesMap[string(groupKey)]; exists { - if smallerSideIsLeftSide { - // Currently iterating through right side. - series.rightSeriesIndices = append(series.rightSeriesIndices, idx) - } else { - series.leftSeriesIndices = append(series.leftSeriesIndices, idx) + leftSeriesUsed = leftSeriesUsed[:len(b.leftMetadata)] + rightSeriesUsed = rightSeriesUsed[:len(b.rightMetadata)] + labelsFunc := groupLabelsFunc(b.VectorMatching, b.Op, b.ReturnBool) + outputSeriesLabelsBytes := make([]byte, 0, 1024) + + for leftSeriesIndex, s := range b.leftMetadata { + outputSeriesLabels := labelsFunc(s.Labels) + outputSeriesLabelsBytes = outputSeriesLabels.Bytes(outputSeriesLabelsBytes) // FIXME: it'd be better if we could just get the underlying byte slice without copying here + outputSeries, exists := outputSeriesMap[string(outputSeriesLabelsBytes)] + + if !exists { + groupKey := groupKeyFunc(s.Labels) + + // Important: don't extract the string(...) call below - passing it directly allows us to avoid allocating it. + rightSide, exists := rightSideGroupsMap[string(groupKey)] + + if !exists { + // No matching series on the right side. + continue + } + + if rightSide.outputSeriesCount == 0 { + // First output series the right side has matched to. + for _, rightSeriesIndex := range rightSide.rightSeriesIndices { + rightSeriesUsed[rightSeriesIndex] = true + } } - } - } - // Remove series that cannot produce samples. - for seriesLabels, outputSeries := range outputSeriesMap { - if len(outputSeries.leftSeriesIndices) == 0 || len(outputSeries.rightSeriesIndices) == 0 { - // No matching series on at least one side for this output series, so output series will have no samples. Remove it. - delete(outputSeriesMap, seriesLabels) + rightSide.outputSeriesCount++ + + outputSeries = oneToOneBinaryOperationOutputSeriesWithLabels{ + labels: outputSeriesLabels, + series: &oneToOneBinaryOperationOutputSeries{rightSide: rightSide}, + } + + outputSeriesMap[string(outputSeriesLabelsBytes)] = outputSeries } + + outputSeries.series.leftSeriesIndices = append(outputSeries.series.leftSeriesIndices, leftSeriesIndex) + leftSeriesUsed[leftSeriesIndex] = true } allMetadata := types.GetSeriesMetadataSlice(len(outputSeriesMap)) allSeries := make([]*oneToOneBinaryOperationOutputSeries, 0, len(outputSeriesMap)) - leftSeriesUsed, err := types.BoolSlicePool.Get(len(b.leftMetadata), b.MemoryConsumptionTracker) - if err != nil { - return nil, nil, nil, nil, err + for _, outputSeries := range outputSeriesMap { + allMetadata = append(allMetadata, types.SeriesMetadata{Labels: outputSeries.labels}) + allSeries = append(allSeries, outputSeries.series) } - rightSeriesUsed, err := types.BoolSlicePool.Get(len(b.rightMetadata), b.MemoryConsumptionTracker) - if err != nil { - return nil, nil, nil, nil, err + return allMetadata, allSeries, leftSeriesUsed, rightSeriesUsed, nil +} + +func (b *OneToOneVectorVectorBinaryOperation) computeLeftSideGroups(groupKeyFunc func(labels.Labels) []byte) map[string]struct{} { + m := map[string]struct{}{} + + for _, s := range b.leftMetadata { + groupKey := groupKeyFunc(s.Labels) + if _, exists := m[string(groupKey)]; !exists { + m[string(groupKey)] = struct{}{} + } } - leftSeriesUsed = leftSeriesUsed[:len(b.leftMetadata)] - rightSeriesUsed = rightSeriesUsed[:len(b.rightMetadata)] + return m +} - for _, outputSeries := range outputSeriesMap { - firstSeriesLabels := b.leftMetadata[outputSeries.leftSeriesIndices[0]].Labels - allMetadata = append(allMetadata, types.SeriesMetadata{Labels: labelsFunc(firstSeriesLabels)}) - allSeries = append(allSeries, outputSeries) +func (b *OneToOneVectorVectorBinaryOperation) computeRightSideGroups(leftSideGroupsMap map[string]struct{}, groupKeyFunc func(labels.Labels) []byte) map[string]*oneToOneBinaryOperationRightSide { + m := map[string]*oneToOneBinaryOperationRightSide{} + + for idx, s := range b.rightMetadata { + groupKey := groupKeyFunc(s.Labels) + + if leftSideGroupsMap != nil { + // Left side is smaller than the right, check if there's any series on the left that could match this right side series. - for _, leftSeriesIndex := range outputSeries.leftSeriesIndices { - leftSeriesUsed[leftSeriesIndex] = true + if _, exists := leftSideGroupsMap[string(groupKey)]; !exists { + continue + } } - for _, rightSeriesIndex := range outputSeries.rightSeriesIndices { - rightSeriesUsed[rightSeriesIndex] = true + group, exists := m[string(groupKey)] // Important: don't extract the string(...) call here - passing it directly allows us to avoid allocating it. + + if !exists { + group = &oneToOneBinaryOperationRightSide{} + m[string(groupKey)] = group } + + group.rightSeriesIndices = append(group.rightSeriesIndices, idx) } - return allMetadata, allSeries, leftSeriesUsed, rightSeriesUsed, nil + return m } // sortSeries sorts metadata and series in place to try to minimise the number of input series we'll need to buffer in memory. @@ -350,28 +418,98 @@ func (b *OneToOneVectorVectorBinaryOperation) NextSeries(ctx context.Context) (t thisSeries := b.remainingSeries[0] b.remainingSeries = b.remainingSeries[1:] + rightSide := thisSeries.rightSide + + if rightSide.rightSeriesIndices != nil { + // Right side hasn't been populated yet. + if err := b.populateRightSide(ctx, rightSide); err != nil { + return types.InstantVectorSeriesData{}, err + } + } + + // We don't need to return thisSeries.rightSide.mergedData here - computeResult will return it below if this is the last output series that references this right side. + rightSide.outputSeriesCount-- + canMutateRightSide := rightSide.outputSeriesCount == 0 allLeftSeries, err := b.leftBuffer.GetSeries(ctx, thisSeries.leftSeriesIndices) if err != nil { return types.InstantVectorSeriesData{}, err } - mergedLeftSide, err := b.mergeSingleSide(allLeftSeries, thisSeries.leftSeriesIndices, b.leftMetadata, "left") + for i, leftSeries := range allLeftSeries { + isLastLeftSeries := i == len(allLeftSeries)-1 + + allLeftSeries[i], err = b.evaluator.computeResult(leftSeries, rightSide.mergedData, true, canMutateRightSide && isLastLeftSeries) + if err != nil { + return types.InstantVectorSeriesData{}, err + } + + // If the right side matches to many output series, check for conflicts between those left side series. + if rightSide.leftSidePresence != nil { + seriesIdx := thisSeries.leftSeriesIndices[i] + + if err := b.updateLeftSidePresence(rightSide, allLeftSeries[i], seriesIdx); err != nil { + return types.InstantVectorSeriesData{}, err + } + } + } + + mergedResult, err := b.mergeSingleSide(allLeftSeries, thisSeries.leftSeriesIndices, b.leftMetadata, "left") if err != nil { return types.InstantVectorSeriesData{}, err } - allRightSeries, err := b.rightBuffer.GetSeries(ctx, thisSeries.rightSeriesIndices) + if rightSide.leftSidePresence != nil && rightSide.outputSeriesCount == 0 { + types.IntSlicePool.Put(rightSide.leftSidePresence, b.MemoryConsumptionTracker) + } + + return mergedResult, nil +} + +func (b *OneToOneVectorVectorBinaryOperation) populateRightSide(ctx context.Context, rightSide *oneToOneBinaryOperationRightSide) error { + allRightSeries, err := b.rightBuffer.GetSeries(ctx, rightSide.rightSeriesIndices) if err != nil { - return types.InstantVectorSeriesData{}, err + return err } - mergedRightSide, err := b.mergeSingleSide(allRightSeries, thisSeries.rightSeriesIndices, b.rightMetadata, "right") + rightSide.mergedData, err = b.mergeSingleSide(allRightSeries, rightSide.rightSeriesIndices, b.rightMetadata, "right") if err != nil { - return types.InstantVectorSeriesData{}, err + return err + } + + if rightSide.outputSeriesCount > 1 { + rightSide.leftSidePresence, err = types.IntSlicePool.Get(b.timeRange.StepCount, b.MemoryConsumptionTracker) + if err != nil { + return err + } + + rightSide.leftSidePresence = rightSide.leftSidePresence[:b.timeRange.StepCount] + + for i := range rightSide.leftSidePresence { + rightSide.leftSidePresence[i] = -1 + } + } + + // Signal that the right side has been populated. + rightSide.rightSeriesIndices = nil + + return nil +} + +func (b *OneToOneVectorVectorBinaryOperation) updateLeftSidePresence(rightSide *oneToOneBinaryOperationRightSide, leftSideData types.InstantVectorSeriesData, leftSideSeriesIdx int) error { + for _, p := range leftSideData.Floats { + if otherSeriesIdx := rightSide.updatePresence(b.timeRange.PointIndex(p.T), leftSideSeriesIdx); otherSeriesIdx != -1 { + return formatConflictError(otherSeriesIdx, leftSideSeriesIdx, "duplicate series", p.T, b.leftMetadata, "left", b.VectorMatching, b.Op, b.ReturnBool) + } + } + + for _, p := range leftSideData.Histograms { + if otherSeriesIdx := rightSide.updatePresence(b.timeRange.PointIndex(p.T), leftSideSeriesIdx); otherSeriesIdx != -1 { + return formatConflictError(otherSeriesIdx, leftSideSeriesIdx, "duplicate series", p.T, b.leftMetadata, "left", b.VectorMatching, b.Op, b.ReturnBool) + } } - return b.evaluator.computeResult(mergedLeftSide, mergedRightSide, true, true) + return nil } // mergeSingleSide exists to handle the case where one side of an output series has different source series at different time steps. @@ -402,30 +540,7 @@ func (b *OneToOneVectorVectorBinaryOperation) mergeSingleSide(data []types.Insta } func (b *OneToOneVectorVectorBinaryOperation) mergeConflictToError(conflict *operators.MergeConflict, sourceSeriesMetadata []types.SeriesMetadata, side string) error { - firstConflictingSeriesLabels := sourceSeriesMetadata[conflict.FirstConflictingSeriesIndex].Labels - groupLabels := groupLabelsFunc(b.VectorMatching, b.Op, b.ReturnBool)(firstConflictingSeriesLabels) - - if conflict.SecondConflictingSeriesIndex == -1 { - return fmt.Errorf( - "found %s for the match group %s on the %s side of the operation at timestamp %s", - conflict.Description, - groupLabels, - side, - timestamp.Time(conflict.Timestamp).Format(time.RFC3339Nano), - ) - } - - secondConflictingSeriesLabels := sourceSeriesMetadata[conflict.SecondConflictingSeriesIndex].Labels - - return fmt.Errorf( - "found %s for the match group %s on the %s side of the operation at timestamp %s: %s and %s", - conflict.Description, - groupLabels, - side, - timestamp.Time(conflict.Timestamp).Format(time.RFC3339Nano), - firstConflictingSeriesLabels, - secondConflictingSeriesLabels, - ) + return formatConflictError(conflict.FirstConflictingSeriesIndex, conflict.SecondConflictingSeriesIndex, conflict.Description, conflict.Timestamp, sourceSeriesMetadata, side, b.VectorMatching, b.Op, b.ReturnBool) } func (b *OneToOneVectorVectorBinaryOperation) Close() { diff --git a/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation_test.go b/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation_test.go index bd697398b3a..0559f775f23 100644 --- a/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation_test.go +++ b/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation_test.go @@ -230,8 +230,8 @@ func TestOneToOneVectorVectorBinaryOperation_Sorting(t *testing.T) { "single output series": { series: []*oneToOneBinaryOperationOutputSeries{ { - leftSeriesIndices: []int{4}, - rightSeriesIndices: []int{1}, + leftSeriesIndices: []int{4}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{1}}, }, }, @@ -241,12 +241,12 @@ func TestOneToOneVectorVectorBinaryOperation_Sorting(t *testing.T) { "two output series, both with one input series, read from both sides in same order and already sorted correctly": { series: []*oneToOneBinaryOperationOutputSeries{ { - leftSeriesIndices: []int{1}, - rightSeriesIndices: []int{1}, + leftSeriesIndices: []int{1}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{1}}, }, { - leftSeriesIndices: []int{2}, - rightSeriesIndices: []int{2}, + leftSeriesIndices: []int{2}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{2}}, }, }, @@ -256,12 +256,12 @@ func TestOneToOneVectorVectorBinaryOperation_Sorting(t *testing.T) { "two output series, both with one input series, read from both sides in same order but sorted incorrectly": { series: []*oneToOneBinaryOperationOutputSeries{ { - leftSeriesIndices: []int{2}, - rightSeriesIndices: []int{2}, + leftSeriesIndices: []int{2}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{2}}, }, { - leftSeriesIndices: []int{1}, - rightSeriesIndices: []int{1}, + leftSeriesIndices: []int{1}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{1}}, }, }, @@ -271,12 +271,12 @@ func TestOneToOneVectorVectorBinaryOperation_Sorting(t *testing.T) { "two output series, both with one input series, read from both sides in different order": { series: []*oneToOneBinaryOperationOutputSeries{ { - leftSeriesIndices: []int{1}, - rightSeriesIndices: []int{2}, + leftSeriesIndices: []int{1}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{2}}, }, { - leftSeriesIndices: []int{2}, - rightSeriesIndices: []int{1}, + leftSeriesIndices: []int{2}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{1}}, }, }, @@ -286,12 +286,12 @@ func TestOneToOneVectorVectorBinaryOperation_Sorting(t *testing.T) { "two output series, both with multiple input series": { series: []*oneToOneBinaryOperationOutputSeries{ { - leftSeriesIndices: []int{1, 2}, - rightSeriesIndices: []int{0, 3}, + leftSeriesIndices: []int{1, 2}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{0, 3}}, }, { - leftSeriesIndices: []int{0, 3}, - rightSeriesIndices: []int{1, 2}, + leftSeriesIndices: []int{0, 3}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{1, 2}}, }, }, @@ -301,16 +301,16 @@ func TestOneToOneVectorVectorBinaryOperation_Sorting(t *testing.T) { "multiple output series, both with one input series, read from both sides in same order and already sorted correctly": { series: []*oneToOneBinaryOperationOutputSeries{ { - leftSeriesIndices: []int{1}, - rightSeriesIndices: []int{1}, + leftSeriesIndices: []int{1}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{1}}, }, { - leftSeriesIndices: []int{2}, - rightSeriesIndices: []int{2}, + leftSeriesIndices: []int{2}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{2}}, }, { - leftSeriesIndices: []int{3}, - rightSeriesIndices: []int{3}, + leftSeriesIndices: []int{3}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{3}}, }, }, @@ -320,16 +320,16 @@ func TestOneToOneVectorVectorBinaryOperation_Sorting(t *testing.T) { "multiple output series, both with one input series, read from both sides in same order but sorted incorrectly": { series: []*oneToOneBinaryOperationOutputSeries{ { - leftSeriesIndices: []int{2}, - rightSeriesIndices: []int{2}, + leftSeriesIndices: []int{2}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{2}}, }, { - leftSeriesIndices: []int{3}, - rightSeriesIndices: []int{3}, + leftSeriesIndices: []int{3}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{3}}, }, { - leftSeriesIndices: []int{1}, - rightSeriesIndices: []int{1}, + leftSeriesIndices: []int{1}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{1}}, }, }, @@ -339,16 +339,16 @@ func TestOneToOneVectorVectorBinaryOperation_Sorting(t *testing.T) { "multiple output series, both with one input series, read from both sides in different order": { series: []*oneToOneBinaryOperationOutputSeries{ { - leftSeriesIndices: []int{1}, - rightSeriesIndices: []int{2}, + leftSeriesIndices: []int{1}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{2}}, }, { - leftSeriesIndices: []int{3}, - rightSeriesIndices: []int{3}, + leftSeriesIndices: []int{3}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{3}}, }, { - leftSeriesIndices: []int{2}, - rightSeriesIndices: []int{1}, + leftSeriesIndices: []int{2}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{1}}, }, }, @@ -358,16 +358,16 @@ func TestOneToOneVectorVectorBinaryOperation_Sorting(t *testing.T) { "multiple output series, with multiple input series each": { series: []*oneToOneBinaryOperationOutputSeries{ { - leftSeriesIndices: []int{4, 5, 10}, - rightSeriesIndices: []int{2, 20}, + leftSeriesIndices: []int{4, 5, 10}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{2, 20}}, }, { - leftSeriesIndices: []int{2, 4, 15}, - rightSeriesIndices: []int{3, 5, 50}, + leftSeriesIndices: []int{2, 4, 15}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{3, 5, 50}}, }, { - leftSeriesIndices: []int{3, 1}, - rightSeriesIndices: []int{1, 40}, + leftSeriesIndices: []int{3, 1}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{1, 40}}, }, }, @@ -377,20 +377,20 @@ func TestOneToOneVectorVectorBinaryOperation_Sorting(t *testing.T) { "multiple output series which depend on the same input series": { series: []*oneToOneBinaryOperationOutputSeries{ { - leftSeriesIndices: []int{1}, - rightSeriesIndices: []int{2}, + leftSeriesIndices: []int{1}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{2}}, }, { - leftSeriesIndices: []int{1}, - rightSeriesIndices: []int{1}, + leftSeriesIndices: []int{1}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{1}}, }, { - leftSeriesIndices: []int{2}, - rightSeriesIndices: []int{2}, + leftSeriesIndices: []int{2}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{2}}, }, { - leftSeriesIndices: []int{2}, - rightSeriesIndices: []int{1}, + leftSeriesIndices: []int{2}, + rightSide: &oneToOneBinaryOperationRightSide{rightSeriesIndices: []int{1}}, }, }, diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index 6f44018866c..3c76763db20 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -276,7 +276,7 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr, timeRange types case parser.CardOneToMany, parser.CardManyToOne: return binops.NewGroupedVectorVectorBinaryOperation(lhs, rhs, *e.VectorMatching, e.Op, e.ReturnBool, q.memoryConsumptionTracker, q.annotations, e.PositionRange(), timeRange) case parser.CardOneToOne: - return binops.NewOneToOneVectorVectorBinaryOperation(lhs, rhs, *e.VectorMatching, e.Op, e.ReturnBool, q.memoryConsumptionTracker, q.annotations, e.PositionRange()) + return binops.NewOneToOneVectorVectorBinaryOperation(lhs, rhs, *e.VectorMatching, e.Op, e.ReturnBool, q.memoryConsumptionTracker, q.annotations, e.PositionRange(), timeRange) default: return nil, compat.NewNotSupportedError(fmt.Sprintf("binary expression with %v matching for '%v'", e.VectorMatching.Card, e.Op)) } diff --git a/pkg/streamingpromql/testdata/ours/binary_operators.test b/pkg/streamingpromql/testdata/ours/binary_operators.test index cb33558432e..f5725eb0399 100644 --- a/pkg/streamingpromql/testdata/ours/binary_operators.test +++ b/pkg/streamingpromql/testdata/ours/binary_operators.test @@ -1397,10 +1397,9 @@ load 6m left_side_b{env="test", pod="a"} 5 6 7 8 right_side{env="test", pod="a"} 2 2 7 7 -# FIXME: MQE currently does not correctly handle this case because it performs filtering after merging input series, whereas we should do it in the other order. -#eval range from 0 to 18m step 6m {__name__=~"left_side.*"} == ignoring(env) right_side -# left_side_a{pod="a"} _ 2 _ _ -# left_side_b{pod="a"} _ _ 7 _ +eval range from 0 to 18m step 6m {__name__=~"left_side.*"} == ignoring(env) right_side + left_side_a{pod="a"} _ 2 _ _ + left_side_b{pod="a"} _ _ 7 _ eval_fail range from 0 to 18m step 6m {__name__=~"left_side.*"} == bool ignoring(env) right_side expected_fail_regexp (multiple matches for labels: many-to-one matching must be explicit|found duplicate series for the match group .* on the left side of the operation) @@ -1416,9 +1415,8 @@ eval_fail range from 0 to 18m step 6m right_side == bool ignoring(env) {__name__ # left_side_b{pod="a"} _ _ 7 _ # but instead both engines drop the metric names in the output. # This is accepted behaviour: https://github.com/prometheus/prometheus/issues/5326 -# FIXME: MQE currently does not correctly handle this case because it performs filtering after merging input series, whereas we should do it in the other order. -#eval range from 0 to 18m step 6m {__name__=~"left_side.*"} == on(pod) right_side -# {pod="a"} _ 2 7 _ +eval range from 0 to 18m step 6m {__name__=~"left_side.*"} == on(pod) right_side + {pod="a"} _ 2 7 _ eval_fail range from 0 to 18m step 6m {__name__=~"left_side.*"} == bool on(pod) right_side expected_fail_regexp (multiple matches for labels: many-to-one matching must be explicit|found duplicate series for the match group .* on the left side of the operation) @@ -1458,9 +1456,11 @@ load 6m left{pod="b"} 5 6 7 8 right 2 2 7 7 -# FIXME: MQE currently does not correctly handle this case because it performs filtering after merging input series, whereas we should do it in the other order. -# eval range from 0 to 18m step 6m left == ignoring(pod) right -# left _ 2 7 _ +eval range from 0 to 18m step 6m left == ignoring(pod) right + left _ 2 7 _ + +eval_fail range from 0 to 18m step 6m left == ignoring(pod) group_right right + expected_fail_regexp found duplicate series for the match group .* on the left (hand-)?side of the operation clear @@ -1470,10 +1470,9 @@ load 6m left_side_b{env="test", pod="a"} _ _ 7 8 right_side{env="test", pod="a"} 2 2 7 7 -# FIXME: MQE currently does not correctly handle this case. -#eval range from 0 to 18m step 6m {__name__=~"left_side.*"} == ignoring(env) right_side -# left_side_a{pod="a"} _ 2 _ _ -# left_side_b{pod="a"} _ _ 7 _ +eval range from 0 to 18m step 6m {__name__=~"left_side.*"} == ignoring(env) right_side + left_side_a{pod="a"} _ 2 _ _ + left_side_b{pod="a"} _ _ 7 _ eval range from 0 to 18m step 6m {__name__=~"left_side.*"} == bool ignoring(env) right_side {pod="a"} 0 1 1 0