-
Notifications
You must be signed in to change notification settings - Fork 133
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This showcases that we *can* make it work; just have to do a lot of cleanup to get there
- Loading branch information
Showing
5 changed files
with
271 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,255 @@ | ||
package alertbackfill | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/prometheus/common/model" | ||
"github.com/prometheus/prometheus/model/labels" | ||
"github.com/prometheus/prometheus/promql" | ||
"github.com/prometheus/prometheus/rules" | ||
"github.com/prometheus/prometheus/storage" | ||
|
||
"github.com/jacksontj/promxy/pkg/promclient" | ||
"github.com/jacksontj/promxy/pkg/proxyquerier" | ||
) | ||
|
||
// NewAlertBackfillQueryable returns a new AlertBackfillQueryable | ||
func NewAlertBackfillQueryable(e *promql.Engine, q storage.Queryable) *AlertBackfillQueryable { | ||
return &AlertBackfillQueryable{e: e, q: q} | ||
} | ||
|
||
// AlertBackfillQueryable returns a storage.Queryable that will handle returning | ||
// results for the RuleManager alert backfill. This is done by first attempting | ||
// to query the downstream store and if no result is found it will "recreate" the | ||
// the series by re-running the necessary query to get the data back | ||
type AlertBackfillQueryable struct { | ||
e *promql.Engine | ||
m *rules.Manager | ||
q storage.Queryable | ||
} | ||
|
||
// SetManager sets the rules.Manager -- this is required as the manager needs the Queryable at creation | ||
func (q *AlertBackfillQueryable) SetManager(m *rules.Manager) { | ||
q.m = m | ||
} | ||
|
||
// Querier returns an AlertBackfillQuerier | ||
func (q *AlertBackfillQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { | ||
return &AlertBackfillQuerier{ | ||
e: q.e, | ||
m: q.m, | ||
q: q.q, | ||
ruleValues: make(map[string]*promql.Result), | ||
mint: mint, | ||
maxt: maxt, | ||
}, nil | ||
} | ||
|
||
// AlertBackfillQuerier will Query a downstream storage.Queryable for the | ||
// ALERTS_FOR_STATE series, if that series is not found -- it will then | ||
// run the appropriate query_range equivalent to re-generate the data. | ||
type AlertBackfillQuerier struct { | ||
e *promql.Engine | ||
m *rules.Manager | ||
q storage.Queryable | ||
|
||
// map of (groupidx.ruleidx) -> result | ||
ruleValues map[string]*promql.Result | ||
mint int64 | ||
maxt int64 | ||
} | ||
|
||
// Select will fetch and return the ALERTS_FOR_STATE series for the given matchers | ||
func (q *AlertBackfillQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { | ||
// first, we call the actual downstream to see if we have the correct data | ||
// this will return something if the remote_write from promxy has been saved | ||
// somewhere where promxy is also configured to read from | ||
querier, err := q.q.Querier(context.TODO(), q.mint, q.maxt) | ||
if err != nil { | ||
return proxyquerier.NewSeriesSet(nil, nil, err) | ||
} | ||
ret := querier.Select(sortSeries, hints, matchers...) | ||
downstreamSeries := make([]storage.Series, 0) | ||
for ret.Next() { | ||
downstreamSeries = append(downstreamSeries, ret.At()) | ||
} | ||
// If the raw queryable had something; return that | ||
if len(downstreamSeries) > 0 { | ||
return proxyquerier.NewSeriesSet(downstreamSeries, ret.Warnings(), ret.Err()) | ||
} | ||
|
||
// right now upstream alert state restore *just* uses the name of the alert and labels | ||
// this causes issues if there are any additional alerts (as only one will be queried | ||
// and it's state will be used for all alerts with the same name + labels). | ||
// TODO: Once upstream fixes this (https://github.com/prometheus/prometheus/issues/12714) | ||
// we'll want to adjust this logic. For now we're effectively mirroring upstream logic | ||
// Now we need to "backfill" the data (regenerate the series that it would have queried) | ||
var ( | ||
matchingGroupIdx int | ||
matchingRuleIdx int | ||
matchingGroup *rules.Group | ||
matchingRule *rules.AlertingRule | ||
) | ||
alertname := matchers[1].Value | ||
|
||
FIND_RULE: | ||
for i, group := range q.m.RuleGroups() { | ||
RULE_LOOP: | ||
for ii, rule := range group.Rules() { | ||
alertingRule, ok := rule.(*rules.AlertingRule) | ||
if !ok { | ||
continue | ||
} | ||
// For now we check if the name and all given labels match | ||
// which is both the best we can do, and equivalent to | ||
// direct prometheus behavior | ||
if alertingRule.Name() == alertname { | ||
// Check the rule labels fit the matcher set | ||
for _, lbl := range alertingRule.Labels() { | ||
for _, m := range matchers[2:] { | ||
if lbl.Name == m.Name { | ||
if !m.Matches(lbl.Value) { | ||
continue RULE_LOOP | ||
} | ||
break | ||
} | ||
} | ||
} | ||
|
||
matchingGroupIdx = i | ||
matchingRuleIdx = ii | ||
matchingGroup = group | ||
matchingRule = alertingRule | ||
break FIND_RULE | ||
} | ||
} | ||
} | ||
|
||
// If we can't find a matching rule; return an empty set | ||
if matchingRule == nil { | ||
return proxyquerier.NewSeriesSet(nil, nil, nil) | ||
} | ||
|
||
step := matchingGroup.Interval() // TODO: better variable naming | ||
key := fmt.Sprintf("%d.%d", matchingGroupIdx, matchingRuleIdx) | ||
result, ok := q.ruleValues[key] | ||
// If we haven't queried this *rule* before; lets load that | ||
if !ok { | ||
now := time.Now() | ||
query, err := q.e.NewRangeQuery(q.q, &promql.QueryOpts{false}, matchingRule.Query().String(), now.Add(-1*matchingRule.HoldDuration()).Add(-1*step), now, step) | ||
if err != nil { | ||
return proxyquerier.NewSeriesSet(nil, nil, err) | ||
} | ||
|
||
result = query.Exec(context.TODO()) | ||
q.ruleValues[key] = result | ||
} | ||
|
||
if result.Err != nil { | ||
return proxyquerier.NewSeriesSet(nil, result.Warnings, result.Err) | ||
} | ||
|
||
// Now we need to filter+convert the result | ||
var val model.Value | ||
// convert promql.Value -> model.Value | ||
switch v := result.Value.(type) { | ||
case promql.Matrix: | ||
matrix := make(model.Matrix, 0, v.Len()) | ||
MATRIXVALUE_LOOP: | ||
for _, item := range v { | ||
metric := make(model.Metric) | ||
for _, label := range item.Metric { | ||
metric[model.LabelName(label.Name)] = model.LabelValue(label.Value) | ||
} | ||
|
||
// Filter to results that match our matchers | ||
for _, matcher := range matchers { | ||
switch matcher.Name { | ||
case model.MetricNameLabel, model.AlertNameLabel: | ||
continue | ||
default: | ||
// Check if the matcher is against a label the rule will add | ||
if matchingRule.Labels().Has(matcher.Name) { | ||
continue | ||
} | ||
// If the matcher doesn't exist; skip this series | ||
if !matcher.Matches(string(metric[model.LabelName(matcher.Name)])) { | ||
continue MATRIXVALUE_LOOP | ||
} | ||
} | ||
} | ||
|
||
// Overwrite the __name__ and alertname | ||
metric[model.MetricNameLabel] = model.LabelValue(matchers[0].Value) | ||
metric[model.AlertNameLabel] = model.LabelValue(matchers[1].Value) | ||
|
||
// TODO: check that the rule manager doesn't add any more labels | ||
// Add the labels which the alert would add | ||
for _, label := range matchingRule.Labels() { | ||
metric[model.LabelName(label.Name)] = model.LabelValue(label.Value) | ||
} | ||
|
||
var ( | ||
activeAt model.SampleValue | ||
lastPoint time.Time | ||
) | ||
// Now we have to convert the *actual* result into the series that is stored for the ALERTS | ||
// TODO: move to another method (with its own tests) | ||
samples := make([]model.SamplePair, len(item.Points)) | ||
for x, sample := range item.Points { | ||
sampleTime := model.Time(sample.T).Time() | ||
|
||
// If we are missing a point in the matrix; then we are going to assume | ||
// that the series cleared, so we need to reset activeAt | ||
if sampleTime.Sub(lastPoint) > step { | ||
activeAt = 0 | ||
} | ||
lastPoint = sampleTime | ||
|
||
// if there is no `activeAt` set; lets set this timestamp (earliest timestamp in the steps that has a point) | ||
if activeAt == 0 { | ||
activeAt = model.SampleValue(model.Time(sample.T).Unix()) | ||
} | ||
|
||
samples[x] = model.SamplePair{ | ||
Timestamp: model.Time(sample.T), | ||
//Value: model.SampleValue(sample.V), | ||
// The timestamp is a unix timestapm of ActiveAt, so we'll set this to the timestamp instead of the value | ||
Value: activeAt, | ||
} | ||
} | ||
|
||
matrix = append(matrix, &model.SampleStream{ | ||
Metric: metric, | ||
Values: samples, | ||
}) | ||
} | ||
val = matrix | ||
|
||
// TODO: check about scalar or subquery? | ||
// We should only get a Matrix result; but just in case we'll have a catchall to not panic :) | ||
default: | ||
return proxyquerier.NewSeriesSet(nil, nil, nil) | ||
} | ||
|
||
iterators := promclient.IteratorsForValue(val) | ||
|
||
series := make([]storage.Series, len(iterators)) | ||
for i, iterator := range iterators { | ||
series[i] = &proxyquerier.Series{iterator} | ||
} | ||
|
||
return proxyquerier.NewSeriesSet(series, result.Warnings, nil) | ||
} | ||
|
||
func (q *AlertBackfillQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { | ||
return nil, nil, fmt.Errorf("not implemented") | ||
} | ||
|
||
func (q *AlertBackfillQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { | ||
return nil, nil, fmt.Errorf("not implemented") | ||
} | ||
|
||
func (q *AlertBackfillQuerier) Close() error { return nil } |