Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to prefer max value at a given timestamp when merging samples #600

Merged
merged 4 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/promxy/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ promxy:
end: -1h
truncate: false

# when merging sample streams, the max value at a given timestamp will be preferred
prefer_max: false

# absolute_time_range defines an absolute time range that this server group contains.
# this is completely optional and start/end are both optional as well
# and example is if the servergroup has been deprecated and is no longer receiving data
Expand Down
14 changes: 8 additions & 6 deletions pkg/promclient/multi_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,16 @@ func NormalizePromError(err error) error {
type MultiAPIMetricFunc func(i int, api, status string, took float64)

// NewMustMultiAPI returns a MultiAPI
func NewMustMultiAPI(apis []API, antiAffinity model.Time, metricFunc MultiAPIMetricFunc, requiredCount int) *MultiAPI {
a, err := NewMultiAPI(apis, antiAffinity, metricFunc, requiredCount)
func NewMustMultiAPI(apis []API, antiAffinity model.Time, metricFunc MultiAPIMetricFunc, requiredCount int, preferMax bool) *MultiAPI {
a, err := NewMultiAPI(apis, antiAffinity, metricFunc, requiredCount, preferMax)
if err != nil {
panic(err)
}
return a
}

// NewMultiAPI returns a MultiAPI
func NewMultiAPI(apis []API, antiAffinity model.Time, metricFunc MultiAPIMetricFunc, requiredCount int) (*MultiAPI, error) {
func NewMultiAPI(apis []API, antiAffinity model.Time, metricFunc MultiAPIMetricFunc, requiredCount int, preferMax bool) (*MultiAPI, error) {
fingerprintCounts := make(map[model.Fingerprint]int)
apiFingerprints := make([]model.Fingerprint, len(apis))
for i, api := range apis {
Expand All @@ -97,6 +97,7 @@ func NewMultiAPI(apis []API, antiAffinity model.Time, metricFunc MultiAPIMetricF
antiAffinity: antiAffinity,
metricFunc: metricFunc,
requiredCount: requiredCount,
preferMax: preferMax,
}, nil
}

Expand All @@ -107,6 +108,7 @@ type MultiAPI struct {
antiAffinity model.Time
metricFunc MultiAPIMetricFunc
requiredCount int // number "per key" that we require to respond
preferMax bool
}

func (m *MultiAPI) recordMetric(i int, api, status string, took float64) {
Expand Down Expand Up @@ -335,7 +337,7 @@ func (m *MultiAPI) Query(ctx context.Context, query string, ts time.Time) (model
result = ret.v
} else {
var err error
result, err = promhttputil.MergeValues(m.antiAffinity, result, ret.v)
result, err = promhttputil.MergeValues(m.antiAffinity, result, ret.v, m.preferMax)
if err != nil {
return nil, warnings.Warnings(), err
}
Expand Down Expand Up @@ -415,7 +417,7 @@ func (m *MultiAPI) QueryRange(ctx context.Context, query string, r v1.Range) (mo
result = ret.v
} else {
var err error
result, err = promhttputil.MergeValues(m.antiAffinity, result, ret.v)
result, err = promhttputil.MergeValues(m.antiAffinity, result, ret.v, m.preferMax)
if err != nil {
return nil, warnings.Warnings(), err
}
Expand Down Expand Up @@ -572,7 +574,7 @@ func (m *MultiAPI) GetValue(ctx context.Context, start, end time.Time, matchers
result = ret.v
} else {
var err error
result, err = promhttputil.MergeValues(m.antiAffinity, result, ret.v)
result, err = promhttputil.MergeValues(m.antiAffinity, result, ret.v, m.preferMax)
if err != nil {
return nil, warnings.Warnings(), err
}
Expand Down
40 changes: 20 additions & 20 deletions pkg/promclient/multi_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func TestMultiAPIMerging(t *testing.T) {
a: NewMustMultiAPI([]API{
&AddLabelClient{stub, model.LabelSet{"a": "1"}},
&AddLabelClient{stub, model.LabelSet{"a": "2"}},
}, model.Time(0), nil, 1),
}, model.Time(0), nil, 1, false),
labelNames: []string{"a"},
labelValues: []model.LabelValue{"1", "2"},
v: model.Vector{
Expand All @@ -226,12 +226,12 @@ func TestMultiAPIMerging(t *testing.T) {
NewMustMultiAPI([]API{
&AddLabelClient{stub, model.LabelSet{"a": "1"}},
&AddLabelClient{stub, model.LabelSet{"a": "1"}},
}, model.Time(0), nil, 1),
}, model.Time(0), nil, 1, false),
NewMustMultiAPI([]API{
&AddLabelClient{stub, model.LabelSet{"a": "2"}},
&AddLabelClient{stub, model.LabelSet{"a": "2"}},
}, model.Time(0), nil, 1),
}, model.Time(0), nil, 2),
}, model.Time(0), nil, 1, false),
}, model.Time(0), nil, 2, false),
labelNames: []string{"a"},
labelValues: []model.LabelValue{"1", "2"},
v: model.Vector{
Expand All @@ -250,23 +250,23 @@ func TestMultiAPIMerging(t *testing.T) {
NewMustMultiAPI([]API{
&AddLabelClient{stub, model.LabelSet{"a": "1"}},
&AddLabelClient{stub, model.LabelSet{"a": "1"}},
}, model.Time(0), nil, 1),
}, model.Time(0), nil, 1, false),
NewMustMultiAPI([]API{
&AddLabelClient{stub, model.LabelSet{"a": "2"}},
&AddLabelClient{stub, model.LabelSet{"a": "2"}},
}, model.Time(0), nil, 1),
}, model.Time(0), nil, 2),
}, model.Time(0), nil, 1, false),
}, model.Time(0), nil, 2, false),
NewMustMultiAPI([]API{
NewMustMultiAPI([]API{
&AddLabelClient{stub, model.LabelSet{"b": "1"}},
&AddLabelClient{stub, model.LabelSet{"b": "1"}},
}, model.Time(0), nil, 1),
}, model.Time(0), nil, 1, false),
NewMustMultiAPI([]API{
&AddLabelClient{stub, model.LabelSet{"b": "2"}},
&AddLabelClient{stub, model.LabelSet{"b": "2"}},
}, model.Time(0), nil, 1),
}, model.Time(0), nil, 2),
}, model.Time(0), nil, 2),
}, model.Time(0), nil, 1, false),
}, model.Time(0), nil, 2, false),
}, model.Time(0), nil, 2, false),
labelNames: []string{"a", "b"},
labelValues: []model.LabelValue{"1", "2"},
v: model.Vector{
Expand Down Expand Up @@ -295,12 +295,12 @@ func TestMultiAPIMerging(t *testing.T) {
NewMustMultiAPI([]API{
&errorAPI{&AddLabelClient{stub, model.LabelSet{"a": "1"}}, fmt.Errorf("")},
&AddLabelClient{stub, model.LabelSet{"a": "1"}},
}, model.Time(0), nil, 1),
}, model.Time(0), nil, 1, false),
NewMustMultiAPI([]API{
&errorAPI{&AddLabelClient{stub, model.LabelSet{"a": "2"}}, fmt.Errorf("")},
&AddLabelClient{stub, model.LabelSet{"a": "2"}},
}, model.Time(0), nil, 1),
}, model.Time(0), nil, 2),
}, model.Time(0), nil, 1, false),
}, model.Time(0), nil, 2, false),
labelNames: []string{"a"},
labelValues: []model.LabelValue{"1", "2"},
v: model.Vector{
Expand All @@ -318,20 +318,20 @@ func TestMultiAPIMerging(t *testing.T) {
NewMustMultiAPI([]API{
&errorAPI{&AddLabelClient{stub, model.LabelSet{"a": "1"}}, fmt.Errorf("")},
&errorAPI{&AddLabelClient{stub, model.LabelSet{"a": "1"}}, fmt.Errorf("")},
}, model.Time(0), nil, 1),
}, model.Time(0), nil, 1, false),
NewMustMultiAPI([]API{
&AddLabelClient{stub, model.LabelSet{"a": "2"}},
&AddLabelClient{stub, model.LabelSet{"a": "2"}},
}, model.Time(0), nil, 1),
}, model.Time(0), nil, 2),
}, model.Time(0), nil, 1, false),
}, model.Time(0), nil, 2, false),
err: true,
},
// if in a multi, all that "match" error, we should error
{
a: NewMustMultiAPI([]API{
&errorAPI{&AddLabelClient{stub, model.LabelSet{"a": "1"}}, fmt.Errorf("")},
&AddLabelClient{stub, model.LabelSet{"a": "2"}},
}, model.Time(0), nil, 1),
}, model.Time(0), nil, 1, false),
err: true,
},
// however, in a multi if a single one succeeds for a given "group" then it should pass
Expand All @@ -340,7 +340,7 @@ func TestMultiAPIMerging(t *testing.T) {
&AddLabelClient{stub, model.LabelSet{"a": "1"}},
&errorAPI{&AddLabelClient{stub, model.LabelSet{"a": "1"}}, fmt.Errorf("")},
&AddLabelClient{stub, model.LabelSet{"a": "2"}},
}, model.Time(0), nil, 1),
}, model.Time(0), nil, 1, false),
labelNames: []string{"a"},
labelValues: []model.LabelValue{"1", "2"},
v: model.Vector{
Expand All @@ -358,7 +358,7 @@ func TestMultiAPIMerging(t *testing.T) {
stub,
&AddLabelClient{stub, model.LabelSet{"a": "1"}},
&AddLabelClient{stub, model.LabelSet{"a": "2"}},
}, model.Time(0), nil, 1),
}, model.Time(0), nil, 1, false),
labelNames: []string{"a"},
labelValues: []model.LabelValue{"1", "2"},
v: model.Vector{
Expand Down
50 changes: 45 additions & 5 deletions pkg/promhttputil/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func ValueAddLabelSet(a model.Value, l model.LabelSet) error {
}

// MergeValues merges values `a` and `b` with the given antiAffinityBuffer
func MergeValues(antiAffinityBuffer model.Time, a, b model.Value) (model.Value, error) {
func MergeValues(antiAffinityBuffer model.Time, a, b model.Value, preferMax bool) (model.Value, error) {
if a == nil {
return b, nil
}
Expand All @@ -88,6 +88,10 @@ func MergeValues(antiAffinityBuffer model.Time, a, b model.Value) (model.Value,
case *model.Scalar:
bTyped := b.(*model.Scalar)

if preferMax && bTyped.Value > aTyped.Value && bTyped.Timestamp != 0 {
return bTyped, nil
}

if aTyped.Value != 0 && aTyped.Timestamp != 0 {
return aTyped, nil
}
Expand Down Expand Up @@ -116,7 +120,8 @@ func MergeValues(antiAffinityBuffer model.Time, a, b model.Value) (model.Value,
// If we've seen this fingerPrint before, lets make sure that a value exists
if index, ok := fingerPrintMap[finger]; ok {
// Only replace if we have no value (which seems reasonable)
if newValue[index].Value == model.SampleValue(0) {
// Or we prefer max value and there is a bigger value
if newValue[index].Value == model.SampleValue(0) || preferMax && newValue[index].Value < item.Value {
newValue[index].Value = item.Value
}
} else {
Expand Down Expand Up @@ -146,7 +151,7 @@ func MergeValues(antiAffinityBuffer model.Time, a, b model.Value) (model.Value,
// If we've seen this fingerPrint before, lets make sure that a value exists
if index, ok := fingerPrintMap[finger]; ok {
// TODO: check this error? For now the only one is sig collision, which we check
newValue[index], _ = MergeSampleStream(antiAffinityBuffer, newValue[index], stream)
newValue[index], _ = MergeSampleStream(antiAffinityBuffer, newValue[index], stream, preferMax)
} else {
newValue = append(newValue, stream)
fingerPrintMap[finger] = len(newValue) - 1
Expand Down Expand Up @@ -175,7 +180,7 @@ func MergeValues(antiAffinityBuffer model.Time, a, b model.Value) (model.Value,
// this problem we're going to *not* merge any datapoint within antiAffinityBuffer of another point
// we have. This means we can tolerate antiAffinityBuffer/2 on either side (which can be used by either
// clock skew or from this scrape skew).
func MergeSampleStream(antiAffinityBuffer model.Time, a, b *model.SampleStream) (*model.SampleStream, error) {
func MergeSampleStream(antiAffinityBuffer model.Time, a, b *model.SampleStream, preferMax bool) (*model.SampleStream, error) {
if a.Metric.Fingerprint() != b.Metric.Fingerprint() {
return nil, fmt.Errorf("cannot merge mismatch fingerprints")
}
Expand Down Expand Up @@ -220,6 +225,8 @@ func MergeSampleStream(antiAffinityBuffer model.Time, a, b *model.SampleStream)

}

lastOffset := bOffset

for _, aValue := range a.Values {
// if we have no points, this one by definition is valid
if len(newValues) == 0 {
Expand All @@ -242,7 +249,40 @@ func MergeSampleStream(antiAffinityBuffer model.Time, a, b *model.SampleStream)
}
}
}
newValues = append(newValues, aValue)

if !preferMax {
newValues = append(newValues, aValue)
} else {
done := false

// see if there is a sample from b within antiAffinityBuffer, that is larger than a
for i := lastOffset; i < len(b.Values); i++ {
bValue := b.Values[i]
// b is not within antiAffinityBuffer of a
if bValue.Timestamp >= (aValue.Timestamp+antiAffinityBuffer) && bValue.Timestamp != aValue.Timestamp {
break
}
// b is within antiAffinityBuffer of a
if bValue.Timestamp == aValue.Timestamp || bValue.Timestamp > (aValue.Timestamp-antiAffinityBuffer) {
// no need to iterate b before this offset next time
lastOffset = i
if bValue.Value > aValue.Value {
// use the larger value from b
// note: there may be larger values from b after this, we will choose the first one we find
// within the antiAffinityBuffer
bValue.Timestamp = aValue.Timestamp
newValues = append(newValues, bValue)
done = true
}
break
}
}

if !done {
//use the larger value from a
newValues = append(newValues, aValue)
}
}
}

lastTime := newValues[len(newValues)-1].Timestamp
Expand Down
Loading