From cda6d5f3f43be1a1e6988473642f047537ceac8e Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Wed, 15 Mar 2023 21:05:54 -0700 Subject: [PATCH 01/10] Initial labelfilter implementation --- pkg/promclient/labelfilter.go | 122 +++++++++++++++++++++++++++++++++ pkg/servergroup/servergroup.go | 5 ++ 2 files changed, 127 insertions(+) create mode 100644 pkg/promclient/labelfilter.go diff --git a/pkg/promclient/labelfilter.go b/pkg/promclient/labelfilter.go new file mode 100644 index 000000000..bce254c8c --- /dev/null +++ b/pkg/promclient/labelfilter.go @@ -0,0 +1,122 @@ +package promclient + +import ( + "context" + "sync" + "time" + + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/promql/parser" +) + +// TODO: config +func NewLabelFilterClient(a API) (*LabelFilterClient, error) { + c := &LabelFilterClient{ + API: a, + LabelsToFilter: []string{"__name__", "job", "version"}, + } + + // TODO; background + if err := c.Sync(context.TODO()); err != nil { + return nil, err + } + + return c, nil +} + +// LabelFilterClient proxies a client and adds the given labels to all results +type LabelFilterClient struct { + API + + LabelsToFilter []string // Which labels we want to pull to check + + // TODO: move to a local block (to disk)?? or optionally so? + // labelFilter is a map of labelName -> labelValue -> nothing (for quick lookups) + labelFilter map[string]map[string]struct{} +} + +func (c *LabelFilterClient) Sync(ctx context.Context) error { + filter := make(map[string]map[string]struct{}) + + for _, label := range c.LabelsToFilter { + labelFilter := make(map[string]struct{}) + // TODO: warn? + vals, _, err := c.LabelValues(ctx, label, nil, model.Time(0).Time(), model.Now().Time()) + if err != nil { + return err + } + for _, v := range vals { + labelFilter[string(v)] = struct{}{} + } + filter[label] = labelFilter + } + + c.labelFilter = filter + + return nil +} + +// Query performs a query for the given time. +func (c *LabelFilterClient) Query(ctx context.Context, query string, ts time.Time) (model.Value, v1.Warnings, error) { + // Parse out the promql query into expressions etc. + e, err := parser.ParseExpr(query) + if err != nil { + return nil, nil, err + } + + filterVisitor := NewFilterLabelVisitor(c.labelFilter) + if _, err := parser.Walk(ctx, filterVisitor, &parser.EvalStmt{Expr: e}, e, nil, nil); err != nil { + return nil, nil, err + } + if !filterVisitor.filterMatch { + return nil, nil, nil + } + + return c.API.Query(ctx, query, ts) +} + +func NewFilterLabelVisitor(filter map[string]map[string]struct{}) *FilterLabelVisitor { + return &FilterLabelVisitor{ + labelFilter: filter, + filterMatch: true, + } +} + +// FilterLabel implements the parser.Visitor interface to filter selectors based on a labelstet +type FilterLabelVisitor struct { + l sync.Mutex + labelFilter map[string]map[string]struct{} + filterMatch bool +} + +// Visit checks if the given node matches the labels in the filter +func (l *FilterLabelVisitor) Visit(node parser.Node, path []parser.Node) (w parser.Visitor, err error) { + switch nodeTyped := node.(type) { + case *parser.VectorSelector: + for _, matcher := range nodeTyped.LabelMatchers { + for labelName, labelFilter := range l.labelFilter { + if matcher.Name == labelName { + match := false + // Check that there is a match somewhere! + for v := range labelFilter { + if matcher.Matches(v) { + match = true + break + } + } + if !match { + l.l.Lock() + l.filterMatch = false + l.l.Unlock() + return nil, nil + } + } + } + } + case *parser.MatrixSelector: + // TODO: + } + + return l, nil +} diff --git a/pkg/servergroup/servergroup.go b/pkg/servergroup/servergroup.go index 66a3db405..5c4e4769b 100644 --- a/pkg/servergroup/servergroup.go +++ b/pkg/servergroup/servergroup.go @@ -252,6 +252,11 @@ func (s *ServerGroup) loadTargetGroupMap(targetGroupMap map[string][]*targetgrou apiClient = &promclient.DebugAPI{apiClient, u.String()} } + apiClient, err = promclient.NewLabelFilterClient(apiClient) + if err != nil { + return err + } + apiClients = append(apiClients, apiClient) } } From 7054f1fb95643da91d11cdcb32ab64f411b71d71 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Thu, 16 Mar 2023 14:42:13 -0700 Subject: [PATCH 02/10] Add resync on interval support and config --- pkg/promclient/labelfilter.go | 89 +++++++++++++++++++++++++++++----- pkg/servergroup/config.go | 3 ++ pkg/servergroup/servergroup.go | 30 ++++++++++-- 3 files changed, 104 insertions(+), 18 deletions(-) diff --git a/pkg/promclient/labelfilter.go b/pkg/promclient/labelfilter.go index bce254c8c..f1f8ffeac 100644 --- a/pkg/promclient/labelfilter.go +++ b/pkg/promclient/labelfilter.go @@ -2,44 +2,107 @@ package promclient import ( "context" + "fmt" "sync" + "sync/atomic" "time" v1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/promql/parser" + "github.com/sirupsen/logrus" ) -// TODO: config -func NewLabelFilterClient(a API) (*LabelFilterClient, error) { +type LabelFilterConfig struct { + LabelsToFilter []string `yaml:"labels_to_filter"` + SyncInterval time.Duration `yaml:"sync_interval"` +} + +func (c *LabelFilterConfig) Validate() error { + for _, l := range c.LabelsToFilter { + if !model.IsValidMetricName(model.LabelValue(l)) { + return fmt.Errorf("%s is not a valid label name", l) + } + } + + return nil +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (c *LabelFilterConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + type plain LabelFilterConfig + if err := unmarshal((*plain)(c)); err != nil { + return err + } + + return c.Validate() +} + +// NewLabelFilterClient returns a LabelFilterClient which will filter the queries sent downstream based +// on a filter of labels maintained in memory from the downstream API. +func NewLabelFilterClient(ctx context.Context, a API, cfg *LabelFilterConfig) (*LabelFilterClient, error) { c := &LabelFilterClient{ - API: a, - LabelsToFilter: []string{"__name__", "job", "version"}, + API: a, + ctx: ctx, + cfg: cfg, } - // TODO; background - if err := c.Sync(context.TODO()); err != nil { + // Do an initial sync + if err := c.Sync(ctx); err != nil { return nil, err } + if cfg.SyncInterval > 0 { + go func() { + ticker := time.NewTicker(cfg.SyncInterval) + for { + select { + case <-ticker.C: + // TODO: metric on sync status + if err := c.Sync(ctx); err != nil { + logrus.Errorf("error syncing in label_filter from downstream: %#v", err) + } + case <-ctx.Done(): + ticker.Stop() + return + } + } + }() + } + return c, nil } -// LabelFilterClient proxies a client and adds the given labels to all results +// LabelFilterClient filters out calls to the downstream based on a label filter +// which is pulled and maintained from the downstream API. type LabelFilterClient struct { API LabelsToFilter []string // Which labels we want to pull to check - // TODO: move to a local block (to disk)?? or optionally so? - // labelFilter is a map of labelName -> labelValue -> nothing (for quick lookups) - labelFilter map[string]map[string]struct{} + // filter is an atomic to hold the LabelFilter which is a map of labelName -> labelValue -> nothing (for quick lookups) + filter atomic.Value + + // Used as the background context for this client + ctx context.Context + + // cfg is a pointer to the config for this client + cfg *LabelFilterConfig +} + +// State returns the current ServerGroupState +func (c *LabelFilterClient) LabelFilter() map[string]map[string]struct{} { + tmp := c.filter.Load() + if ret, ok := tmp.(map[string]map[string]struct{}); ok { + return ret + } + return nil } func (c *LabelFilterClient) Sync(ctx context.Context) error { filter := make(map[string]map[string]struct{}) - for _, label := range c.LabelsToFilter { + for _, label := range c.cfg.LabelsToFilter { labelFilter := make(map[string]struct{}) // TODO: warn? vals, _, err := c.LabelValues(ctx, label, nil, model.Time(0).Time(), model.Now().Time()) @@ -52,7 +115,7 @@ func (c *LabelFilterClient) Sync(ctx context.Context) error { filter[label] = labelFilter } - c.labelFilter = filter + c.filter.Store(filter) return nil } @@ -65,7 +128,7 @@ func (c *LabelFilterClient) Query(ctx context.Context, query string, ts time.Tim return nil, nil, err } - filterVisitor := NewFilterLabelVisitor(c.labelFilter) + filterVisitor := NewFilterLabelVisitor(c.LabelFilter()) if _, err := parser.Walk(ctx, filterVisitor, &parser.EvalStmt{Expr: e}, e, nil, nil); err != nil { return nil, nil, err } diff --git a/pkg/servergroup/config.go b/pkg/servergroup/config.go index e5b86f48a..7b0e58fa0 100644 --- a/pkg/servergroup/config.go +++ b/pkg/servergroup/config.go @@ -160,6 +160,9 @@ type Config struct { // An example use-case would be if a specific servergroup was was "deprecated" and wasn't getting // any new data after a specific given point in time AbsoluteTimeRangeConfig *AbsoluteTimeRangeConfig `yaml:"absolute_time_range"` + + // TODO: docs + LabelFilterConfig *promclient.LabelFilterConfig `yaml:"label_filter"` } // GetScheme returns the scheme for this servergroup diff --git a/pkg/servergroup/servergroup.go b/pkg/servergroup/servergroup.go index 5c4e4769b..cf2636f68 100644 --- a/pkg/servergroup/servergroup.go +++ b/pkg/servergroup/servergroup.go @@ -73,6 +73,9 @@ type ServerGroupState struct { // Targets is the list of target URLs for this discovery round Targets []string apiClient promclient.API + + ctx context.Context + ctxCancel context.CancelFunc } // ServerGroup encapsulates a set of prometheus downstreams to query/aggregate @@ -132,10 +135,17 @@ func (s *ServerGroup) Sync() { } } -func (s *ServerGroup) loadTargetGroupMap(targetGroupMap map[string][]*targetgroup.Group) error { +func (s *ServerGroup) loadTargetGroupMap(targetGroupMap map[string][]*targetgroup.Group) (err error) { targets := make([]string, 0) apiClients := make([]promclient.API, 0) + ctx, ctxCancel := context.WithCancel(context.Background()) + defer func() { + if err != nil { + ctxCancel() + } + }() + for _, targetGroupList := range targetGroupMap { for _, targetGroup := range targetGroupList { for _, target := range targetGroup.Targets { @@ -252,9 +262,12 @@ func (s *ServerGroup) loadTargetGroupMap(targetGroupMap map[string][]*targetgrou apiClient = &promclient.DebugAPI{apiClient, u.String()} } - apiClient, err = promclient.NewLabelFilterClient(apiClient) - if err != nil { - return err + // Add LabelFilter if configured + if s.Cfg.LabelFilterConfig != nil { + apiClient, err = promclient.NewLabelFilterClient(ctx, apiClient, s.Cfg.LabelFilterConfig) + if err != nil { + return err + } } apiClients = append(apiClients, apiClient) @@ -271,16 +284,23 @@ func (s *ServerGroup) loadTargetGroupMap(targetGroupMap map[string][]*targetgrou if err != nil { return err } + newState := &ServerGroupState{ Targets: targets, apiClient: apiClient, + ctx: ctx, + ctxCancel: ctxCancel, } if s.Cfg.IgnoreError { newState.apiClient = &promclient.IgnoreErrorAPI{newState.apiClient} } - s.state.Store(newState) + oldState := s.State() // Fetch the current state (so we can stop it) + s.state.Store(newState) // Store new state + if oldState != nil { + oldState.ctxCancel() // Cancel the old state + } if !s.loaded { s.loaded = true From 0bd8cb11b0e1b577b636deb47533a6d6df501246 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Thu, 16 Mar 2023 14:48:39 -0700 Subject: [PATCH 03/10] Add metrics --- pkg/promclient/labelfilter.go | 31 +++++++++++++++++++++++++++++-- pkg/servergroup/servergroup.go | 2 +- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/pkg/promclient/labelfilter.go b/pkg/promclient/labelfilter.go index f1f8ffeac..14a4fdb17 100644 --- a/pkg/promclient/labelfilter.go +++ b/pkg/promclient/labelfilter.go @@ -8,11 +8,31 @@ import ( "time" v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/promql/parser" "github.com/sirupsen/logrus" ) +// Metrics +var ( + syncCount = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "promxy_label_filter_sync_count_total", + Help: "How many syncs completed from a promxy label_filter, partitioned by success", + }, []string{"status"}) + syncSummary = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Name: "promxy_label_filter_sync_duration_seconds", + Help: "Latency of sync process from a promxy label_fitler", + }, []string{"status"}) +) + +func init() { + prometheus.MustRegister( + syncCount, + syncSummary, + ) +} + type LabelFilterConfig struct { LabelsToFilter []string `yaml:"labels_to_filter"` SyncInterval time.Duration `yaml:"sync_interval"` @@ -58,10 +78,17 @@ func NewLabelFilterClient(ctx context.Context, a API, cfg *LabelFilterConfig) (* for { select { case <-ticker.C: - // TODO: metric on sync status - if err := c.Sync(ctx); err != nil { + start := time.Now() + err := c.Sync(ctx) + took := time.Since(start) + status := "success" + if err != nil { logrus.Errorf("error syncing in label_filter from downstream: %#v", err) + status = "error" } + syncCount.WithLabelValues(status).Inc() + syncSummary.WithLabelValues(status).Observe(took.Seconds()) + case <-ctx.Done(): ticker.Stop() return diff --git a/pkg/servergroup/servergroup.go b/pkg/servergroup/servergroup.go index cf2636f68..946e535c0 100644 --- a/pkg/servergroup/servergroup.go +++ b/pkg/servergroup/servergroup.go @@ -262,7 +262,7 @@ func (s *ServerGroup) loadTargetGroupMap(targetGroupMap map[string][]*targetgrou apiClient = &promclient.DebugAPI{apiClient, u.String()} } - // Add LabelFilter if configured + // Add LabelFilter if configured if s.Cfg.LabelFilterConfig != nil { apiClient, err = promclient.NewLabelFilterClient(ctx, apiClient, s.Cfg.LabelFilterConfig) if err != nil { From aa1be8bb9475cd3a86640662fc467660669f0254 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Thu, 16 Mar 2023 14:56:21 -0700 Subject: [PATCH 04/10] Add exclude_labels option This set of labels is specifically excluded from the list we get from downstream. --- pkg/promclient/labelfilter.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/pkg/promclient/labelfilter.go b/pkg/promclient/labelfilter.go index 14a4fdb17..9f456d978 100644 --- a/pkg/promclient/labelfilter.go +++ b/pkg/promclient/labelfilter.go @@ -34,8 +34,9 @@ func init() { } type LabelFilterConfig struct { - LabelsToFilter []string `yaml:"labels_to_filter"` - SyncInterval time.Duration `yaml:"sync_interval"` + LabelsToFilter []string `yaml:"labels_to_filter"` + SyncInterval time.Duration `yaml:"sync_interval"` + ExcludeLabels map[string][]string `yaml:"exclude_labels"` } func (c *LabelFilterConfig) Validate() error { @@ -142,6 +143,16 @@ func (c *LabelFilterClient) Sync(ctx context.Context) error { filter[label] = labelFilter } + // Apply exclude list + for k, vList := range c.cfg.ExcludeLabels { + if filterMap, ok := filter[k]; ok { + for _, item := range vList { + delete(filterMap, item) + } + filter[k] = filterMap + } + } + c.filter.Store(filter) return nil From bff7e02bfd578dc043f7d38c04458607ffa845d7 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Thu, 16 Mar 2023 21:19:59 -0700 Subject: [PATCH 05/10] default to returning nil if override methods aren't defined --- pkg/promclient/multi_api_test.go | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/pkg/promclient/multi_api_test.go b/pkg/promclient/multi_api_test.go index 8c8338673..7440fc5f8 100644 --- a/pkg/promclient/multi_api_test.go +++ b/pkg/promclient/multi_api_test.go @@ -14,7 +14,7 @@ import ( type stubAPI struct { labelNames func() []string - labelValues func() model.LabelValues + labelValues func(label string) model.LabelValues query func() model.Value queryRange func() model.Value series func() []model.LabelSet @@ -24,36 +24,57 @@ type stubAPI struct { // LabelNames returns all the unique label names present in the block in sorted order. func (s *stubAPI) LabelNames(ctx context.Context, matchers []string, startTime time.Time, endTime time.Time) ([]string, v1.Warnings, error) { + if s.labelNames == nil { + return nil, nil, nil + } return s.labelNames(), nil, nil } // LabelValues performs a query for the values of the given label. func (s *stubAPI) LabelValues(ctx context.Context, label string, matchers []string, startTime time.Time, endTime time.Time) (model.LabelValues, v1.Warnings, error) { - return s.labelValues(), nil, nil + if s.labelValues == nil { + return nil, nil, nil + } + return s.labelValues(label), nil, nil } // Query performs a query for the given time. func (s *stubAPI) Query(ctx context.Context, query string, ts time.Time) (model.Value, v1.Warnings, error) { + if s.query == nil { + return nil, nil, nil + } return s.query(), nil, nil } // QueryRange performs a query for the given range. func (s *stubAPI) QueryRange(ctx context.Context, query string, r v1.Range) (model.Value, v1.Warnings, error) { + if s.queryRange == nil { + return nil, nil, nil + } return s.queryRange(), nil, nil } // Series finds series by label matchers. func (s *stubAPI) Series(ctx context.Context, matches []string, startTime time.Time, endTime time.Time) ([]model.LabelSet, v1.Warnings, error) { + if s.series == nil { + return nil, nil, nil + } return s.series(), nil, nil } // GetValue loads the raw data for a given set of matchers in the time range func (s *stubAPI) GetValue(ctx context.Context, start, end time.Time, matchers []*labels.Matcher) (model.Value, v1.Warnings, error) { + if s.getValue == nil { + return nil, nil, nil + } return s.getValue(), nil, nil } // Metadata returns metadata about metrics currently scraped by the metric name. func (s *stubAPI) Metadata(ctx context.Context, metric, limit string) (map[string][]v1.Metadata, error) { + if s.metadata == nil { + return nil, nil + } return s.metadata(), nil } @@ -130,7 +151,7 @@ func TestMultiAPIMerging(t *testing.T) { return []string{"a"} }, - labelValues: func() model.LabelValues { + labelValues: func(_ string) model.LabelValues { return model.LabelValues{} }, query: func() model.Value { From d3fb69d934b09f93b275a0606178288e4a7b86f9 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Thu, 16 Mar 2023 21:32:41 -0700 Subject: [PATCH 06/10] Add remaining implementation and tests --- pkg/promclient/labelfilter.go | 105 ++++++++++--- pkg/promclient/labelfilter_test.go | 236 +++++++++++++++++++++++++++++ 2 files changed, 322 insertions(+), 19 deletions(-) create mode 100644 pkg/promclient/labelfilter_test.go diff --git a/pkg/promclient/labelfilter.go b/pkg/promclient/labelfilter.go index 9f456d978..7cd9b1e8c 100644 --- a/pkg/promclient/labelfilter.go +++ b/pkg/promclient/labelfilter.go @@ -10,6 +10,7 @@ import ( v1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" "github.com/sirupsen/logrus" ) @@ -177,6 +178,65 @@ func (c *LabelFilterClient) Query(ctx context.Context, query string, ts time.Tim return c.API.Query(ctx, query, ts) } +// Query performs a query for the given time. +func (c *LabelFilterClient) QueryRange(ctx context.Context, query string, r v1.Range) (model.Value, v1.Warnings, error) { + // Parse out the promql query into expressions etc. + e, err := parser.ParseExpr(query) + if err != nil { + return nil, nil, err + } + + filterVisitor := NewFilterLabelVisitor(c.LabelFilter()) + if _, err := parser.Walk(ctx, filterVisitor, &parser.EvalStmt{Expr: e}, e, nil, nil); err != nil { + return nil, nil, err + } + if !filterVisitor.filterMatch { + return nil, nil, nil + } + + return c.API.QueryRange(ctx, query, r) +} + +// Series finds series by label matchers. +func (c *LabelFilterClient) Series(ctx context.Context, matches []string, startTime time.Time, endTime time.Time) ([]model.LabelSet, v1.Warnings, error) { + for _, m := range matches { + matchers, err := parser.ParseMetricSelector(m) + if err != nil { + return nil, nil, err + } + // check if the matcher is excluded by our filter + for _, matcher := range matchers { + if !FilterLabelMatchers(c.LabelFilter(), matcher) { + return nil, nil, nil + } + } + } + return c.API.Series(ctx, matches, startTime, endTime) +} + +// GetValue loads the raw data for a given set of matchers in the time range +func (c *LabelFilterClient) GetValue(ctx context.Context, start, end time.Time, matchers []*labels.Matcher) (model.Value, v1.Warnings, error) { + // check if the matcher is excluded by our filter + for _, matcher := range matchers { + if !FilterLabelMatchers(c.LabelFilter(), matcher) { + return nil, nil, nil + } + } + return c.API.GetValue(ctx, start, end, matchers) +} + +// Metadata returns metadata about metrics currently scraped by the metric name. +func (c *LabelFilterClient) Metadata(ctx context.Context, metric, limit string) (map[string][]v1.Metadata, error) { + matcher, err := labels.NewMatcher(labels.MatchEqual, labels.MetricName, metric) + if err != nil { + return nil, err + } + if !FilterLabelMatchers(c.LabelFilter(), matcher) { + return nil, nil + } + return c.API.Metadata(ctx, metric, limit) +} + func NewFilterLabelVisitor(filter map[string]map[string]struct{}) *FilterLabelVisitor { return &FilterLabelVisitor{ labelFilter: filter, @@ -196,28 +256,35 @@ func (l *FilterLabelVisitor) Visit(node parser.Node, path []parser.Node) (w pars switch nodeTyped := node.(type) { case *parser.VectorSelector: for _, matcher := range nodeTyped.LabelMatchers { - for labelName, labelFilter := range l.labelFilter { - if matcher.Name == labelName { - match := false - // Check that there is a match somewhere! - for v := range labelFilter { - if matcher.Matches(v) { - match = true - break - } - } - if !match { - l.l.Lock() - l.filterMatch = false - l.l.Unlock() - return nil, nil - } - } + if !FilterLabelMatchers(l.labelFilter, matcher) { + l.l.Lock() + l.filterMatch = false + l.l.Unlock() + return nil, nil } } - case *parser.MatrixSelector: - // TODO: } return l, nil } + +// TODO: better name, this is to check if a matcher is in the filter +func FilterLabelMatchers(filter map[string]map[string]struct{}, matcher *labels.Matcher) bool { + for labelName, labelFilter := range filter { + if matcher.Name == labelName { + match := false + // Check that there is a match somewhere! + for v := range labelFilter { + if matcher.Matches(v) { + match = true + break + } + } + if !match { + return match + } + } + } + + return true +} diff --git a/pkg/promclient/labelfilter_test.go b/pkg/promclient/labelfilter_test.go new file mode 100644 index 000000000..61bfc045f --- /dev/null +++ b/pkg/promclient/labelfilter_test.go @@ -0,0 +1,236 @@ +package promclient + +import ( + "context" + "strconv" + "testing" + "time" + + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql/parser" +) + +func newCountAPI(a API) *countAPI { + return &countAPI{ + API: a, + callCount: map[string]int{ + "LabelNames": 0, + "LabelValues": 0, + "Query": 0, + "QueryRange": 0, + "Series": 0, + "GetValue": 0, + "Metadata": 0, + }, + } +} + +type countAPI struct { + API + callCount map[string]int +} + +// LabelNames returns all the unique label names present in the block in sorted order. +func (s *countAPI) LabelNames(ctx context.Context, matchers []string, startTime time.Time, endTime time.Time) ([]string, v1.Warnings, error) { + s.callCount["LabelNames"]++ + return s.API.LabelNames(ctx, matchers, startTime, endTime) +} + +// LabelValues performs a query for the values of the given label. +func (s *countAPI) LabelValues(ctx context.Context, label string, matchers []string, startTime time.Time, endTime time.Time) (model.LabelValues, v1.Warnings, error) { + s.callCount["LabelValues"]++ + return s.API.LabelValues(ctx, label, matchers, startTime, endTime) +} + +// Query performs a query for the given time. +func (s *countAPI) Query(ctx context.Context, query string, ts time.Time) (model.Value, v1.Warnings, error) { + s.callCount["Query"]++ + return s.API.Query(ctx, query, ts) +} + +// QueryRange performs a query for the given range. +func (s *countAPI) QueryRange(ctx context.Context, query string, r v1.Range) (model.Value, v1.Warnings, error) { + s.callCount["QueryRange"]++ + return s.API.QueryRange(ctx, query, r) +} + +// Series finds series by label matchers. +func (s *countAPI) Series(ctx context.Context, matches []string, startTime time.Time, endTime time.Time) ([]model.LabelSet, v1.Warnings, error) { + s.callCount["Series"]++ + return s.API.Series(ctx, matches, startTime, endTime) +} + +// GetValue loads the raw data for a given set of matchers in the time range +func (s *countAPI) GetValue(ctx context.Context, start, end time.Time, matchers []*labels.Matcher) (model.Value, v1.Warnings, error) { + s.callCount["GetValue"]++ + return s.API.GetValue(ctx, start, end, matchers) +} + +// Metadata returns metadata about metrics currently scraped by the metric name. +func (s *countAPI) Metadata(ctx context.Context, metric, limit string) (map[string][]v1.Metadata, error) { + s.callCount["Metadata"]++ + return s.API.Metadata(ctx, metric, limit) +} + +func TestLabelFilter(t *testing.T) { + /* + + The idea here is that the datasource has the following data: + + up{filterlabel="a"} + up{filterlabel="b"} + testmetric{filterlabel="a"} + testmetric{filterlabel="b"} + + */ + + stub := &stubAPI{ + // Override the LabelValues endpoint (which is the one that LabelFilter uses to determine its filter) + labelValues: func(label string) model.LabelValues { + switch label { + case "__name__": + return model.LabelValues{ + "up", + "testmetric", + } + case "filterlabel": + return model.LabelValues{ + "a", + "b", + } + } + return model.LabelValues{} + }, + } + + // Wrap the stub in a counter + countAPI := newCountAPI(stub) + + // Set up some vars + ctx := context.TODO() // TODO + + // Create the LabelFilter client + cfg := &LabelFilterConfig{ + LabelsToFilter: []string{"__name__", "filterlabel"}, + ExcludeLabels: map[string][]string{ + "__name__": {"up"}, + }, + } + + filterClient, err := NewLabelFilterClient(ctx, countAPI, cfg) + if err != nil { + t.Fatal(err) + } + + tests := []struct { + query string // query to run + callCount int // how many calls expected + }{ + {query: "notametric"}, // A metric that definitely doesn't exist + {query: "testmetric", callCount: 1}, // A metric that does exist + {query: "up"}, // A metric that does exist, but we filter out + {query: `{filterlabel="notavalue"}`}, // A metric that definitely doesn't exist + {query: `{notalabel="notavalue"}`, callCount: 1}, // A metric that definitely doesn't exist, but isn't filterable + {query: `{filterlabel="a"}`, callCount: 1}, // A metric that does exist + {query: `{filterlabel="b"}`, callCount: 1}, // A metric that does exist + } + + t.Run("Query", func(t *testing.T) { + for i, test := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + beforeCount := countAPI.callCount["Query"] + _, _, err := filterClient.Query(ctx, test.query, model.Time(100).Time()) + if err != nil { + t.Fatal(err) + } + callCount := countAPI.callCount["Query"] - beforeCount + if test.callCount != callCount { + t.Fatalf("mismatch in callCount when running %s expected=%d actual=%d", test.query, test.callCount, callCount) + } + }) + } + }) + + t.Run("QueryRange", func(t *testing.T) { + for i, test := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + beforeCount := countAPI.callCount["QueryRange"] + _, _, err := filterClient.QueryRange(ctx, test.query, v1.Range{Start: model.Time(0).Time(), End: model.Time(100).Time(), Step: time.Millisecond}) + if err != nil { + t.Fatal(err) + } + callCount := countAPI.callCount["QueryRange"] - beforeCount + if test.callCount != callCount { + t.Fatalf("mismatch in callCount when running %s expected=%d actual=%d", test.query, test.callCount, callCount) + } + }) + } + }) + + t.Run("Series", func(t *testing.T) { + for i, test := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + beforeCount := countAPI.callCount["Series"] + _, _, err := filterClient.Series(ctx, []string{test.query}, model.Time(0).Time(), model.Time(100).Time()) + if err != nil { + t.Fatal(err) + } + callCount := countAPI.callCount["Series"] - beforeCount + if test.callCount != callCount { + t.Fatalf("mismatch in callCount when running %s expected=%d actual=%d", test.query, test.callCount, callCount) + } + }) + } + }) + + t.Run("GetValue", func(t *testing.T) { + for i, test := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + beforeCount := countAPI.callCount["GetValue"] + + // TODO: convert query to matchers + matchers, err := parser.ParseMetricSelector(test.query) + if err != nil { + t.Fatal(err) + } + + _, _, err = filterClient.GetValue(ctx, model.Time(0).Time(), model.Time(100).Time(), matchers) + if err != nil { + t.Fatal(err) + } + callCount := countAPI.callCount["GetValue"] - beforeCount + if test.callCount != callCount { + t.Fatalf("mismatch in callCount when running %s expected=%d actual=%d", test.query, test.callCount, callCount) + } + }) + } + }) + + t.Run("Metadata", func(t *testing.T) { + tests := []struct { + metric string // query to run + callCount int // how many calls expected + }{ + {metric: "notametric"}, // A metric that definitely doesn't exist + {metric: "testmetric", callCount: 1}, // A metric that does exist + {metric: "up"}, // A metric that does exist, but we filter out + } + + for i, test := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + beforeCount := countAPI.callCount["Metadata"] + _, err := filterClient.Metadata(ctx, test.metric, "") + if err != nil { + t.Fatal(err) + } + callCount := countAPI.callCount["Metadata"] - beforeCount + if test.callCount != callCount { + t.Fatalf("mismatch in callCount when running %s expected=%d actual=%d", test.metric, test.callCount, callCount) + } + }) + } + }) + +} From d52eeb6c1545850f0fbfe68e0ca4d1e39e61b713 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Fri, 17 Mar 2023 10:19:56 -0700 Subject: [PATCH 07/10] Add metric for number of filtered requests --- pkg/promclient/labelfilter.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/promclient/labelfilter.go b/pkg/promclient/labelfilter.go index 7cd9b1e8c..3097fd539 100644 --- a/pkg/promclient/labelfilter.go +++ b/pkg/promclient/labelfilter.go @@ -25,6 +25,10 @@ var ( Name: "promxy_label_filter_sync_duration_seconds", Help: "Latency of sync process from a promxy label_fitler", }, []string{"status"}) + filteredCount = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "promxy_label_filter_filtered_count_total", + Help: "How many requests have been filtered from the downstream,, partitioned by query type", + }, []string{"type"}) ) func init() { @@ -172,6 +176,7 @@ func (c *LabelFilterClient) Query(ctx context.Context, query string, ts time.Tim return nil, nil, err } if !filterVisitor.filterMatch { + filteredCount.WithLabelValues("Query").Inc() return nil, nil, nil } @@ -191,6 +196,7 @@ func (c *LabelFilterClient) QueryRange(ctx context.Context, query string, r v1.R return nil, nil, err } if !filterVisitor.filterMatch { + filteredCount.WithLabelValues("QueryRange").Inc() return nil, nil, nil } @@ -207,6 +213,7 @@ func (c *LabelFilterClient) Series(ctx context.Context, matches []string, startT // check if the matcher is excluded by our filter for _, matcher := range matchers { if !FilterLabelMatchers(c.LabelFilter(), matcher) { + filteredCount.WithLabelValues("Series").Inc() return nil, nil, nil } } @@ -219,6 +226,7 @@ func (c *LabelFilterClient) GetValue(ctx context.Context, start, end time.Time, // check if the matcher is excluded by our filter for _, matcher := range matchers { if !FilterLabelMatchers(c.LabelFilter(), matcher) { + filteredCount.WithLabelValues("GetValue").Inc() return nil, nil, nil } } @@ -232,6 +240,7 @@ func (c *LabelFilterClient) Metadata(ctx context.Context, metric, limit string) return nil, err } if !FilterLabelMatchers(c.LabelFilter(), matcher) { + filteredCount.WithLabelValues("Metadata").Inc() return nil, nil } return c.API.Metadata(ctx, metric, limit) From c9f8af6e4be8e04161bb4c24c7774f7bdb1dea19 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Fri, 17 Mar 2023 10:29:06 -0700 Subject: [PATCH 08/10] Add static include labels --- pkg/promclient/labelfilter.go | 27 +++++++++++++++++++-------- pkg/promclient/labelfilter_test.go | 8 ++++++-- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/pkg/promclient/labelfilter.go b/pkg/promclient/labelfilter.go index 3097fd539..808bc1f31 100644 --- a/pkg/promclient/labelfilter.go +++ b/pkg/promclient/labelfilter.go @@ -39,13 +39,14 @@ func init() { } type LabelFilterConfig struct { - LabelsToFilter []string `yaml:"labels_to_filter"` - SyncInterval time.Duration `yaml:"sync_interval"` - ExcludeLabels map[string][]string `yaml:"exclude_labels"` + DynamicLabels []string `yaml:"dynamic_labels_include"` + SyncInterval time.Duration `yaml:"sync_interval"` + StaticLabelsInclude map[string][]string `yaml:"static_labels_include"` + StaticLabelsExclude map[string][]string `yaml:"static_labels_exclude"` } func (c *LabelFilterConfig) Validate() error { - for _, l := range c.LabelsToFilter { + for _, l := range c.DynamicLabels { if !model.IsValidMetricName(model.LabelValue(l)) { return fmt.Errorf("%s is not a valid label name", l) } @@ -111,8 +112,6 @@ func NewLabelFilterClient(ctx context.Context, a API, cfg *LabelFilterConfig) (* type LabelFilterClient struct { API - LabelsToFilter []string // Which labels we want to pull to check - // filter is an atomic to hold the LabelFilter which is a map of labelName -> labelValue -> nothing (for quick lookups) filter atomic.Value @@ -135,7 +134,7 @@ func (c *LabelFilterClient) LabelFilter() map[string]map[string]struct{} { func (c *LabelFilterClient) Sync(ctx context.Context) error { filter := make(map[string]map[string]struct{}) - for _, label := range c.cfg.LabelsToFilter { + for _, label := range c.cfg.DynamicLabels { labelFilter := make(map[string]struct{}) // TODO: warn? vals, _, err := c.LabelValues(ctx, label, nil, model.Time(0).Time(), model.Now().Time()) @@ -148,8 +147,20 @@ func (c *LabelFilterClient) Sync(ctx context.Context) error { filter[label] = labelFilter } + // Apply static include list + for k, vList := range c.cfg.StaticLabelsInclude { + filterMap, ok := filter[k] + if !ok { + filterMap = make(map[string]struct{}) + } + for _, item := range vList { + filterMap[item] = struct{}{} + } + filter[k] = filterMap + } + // Apply exclude list - for k, vList := range c.cfg.ExcludeLabels { + for k, vList := range c.cfg.StaticLabelsExclude { if filterMap, ok := filter[k]; ok { for _, item := range vList { delete(filterMap, item) diff --git a/pkg/promclient/labelfilter_test.go b/pkg/promclient/labelfilter_test.go index 61bfc045f..ded5d9fb7 100644 --- a/pkg/promclient/labelfilter_test.go +++ b/pkg/promclient/labelfilter_test.go @@ -113,8 +113,11 @@ func TestLabelFilter(t *testing.T) { // Create the LabelFilter client cfg := &LabelFilterConfig{ - LabelsToFilter: []string{"__name__", "filterlabel"}, - ExcludeLabels: map[string][]string{ + DynamicLabels: []string{"__name__", "filterlabel"}, + StaticLabelsInclude: map[string][]string{ + "__name__": {"staticinclude"}, + }, + StaticLabelsExclude: map[string][]string{ "__name__": {"up"}, }, } @@ -130,6 +133,7 @@ func TestLabelFilter(t *testing.T) { }{ {query: "notametric"}, // A metric that definitely doesn't exist {query: "testmetric", callCount: 1}, // A metric that does exist + {query: "staticinclude", callCount: 1}, // A metric that statically exists {query: "up"}, // A metric that does exist, but we filter out {query: `{filterlabel="notavalue"}`}, // A metric that definitely doesn't exist {query: `{notalabel="notavalue"}`, callCount: 1}, // A metric that definitely doesn't exist, but isn't filterable From 716ed2e12acb1e90f3451f2675af8b96e6149f1a Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Fri, 17 Mar 2023 10:41:46 -0700 Subject: [PATCH 09/10] Add docs --- pkg/promclient/labelfilter.go | 19 +++++++++++++++++-- pkg/servergroup/config.go | 25 ++++++++++++++++++++++++- 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/pkg/promclient/labelfilter.go b/pkg/promclient/labelfilter.go index 808bc1f31..32493aa11 100644 --- a/pkg/promclient/labelfilter.go +++ b/pkg/promclient/labelfilter.go @@ -38,10 +38,21 @@ func init() { ) } +// LabelFilterConfig is the configuraiton for the LabelFilterClient type LabelFilterConfig struct { - DynamicLabels []string `yaml:"dynamic_labels_include"` - SyncInterval time.Duration `yaml:"sync_interval"` + // DynamicLabels is a list of labels to dynamically maintain a filter from the downstream from + DynamicLabels []string `yaml:"dynamic_labels"` + // SyncInterval defines how frequenlty to update the dynamic label filter + SyncInterval time.Duration `yaml:"sync_interval"` + // StaticLabelsInclude is a set of labels to always add to the downstream filter + // this allows you to define some metrics to be included statically if you want to + // avoid polling the downstream. + // NOTE: this is not a "secure" measure as this entire label_filter is based on matchers + // and as such doesn't restrict which metrics they touch (e.g. if you restrict by `__name__` + // the could just query by another label). StaticLabelsInclude map[string][]string `yaml:"static_labels_include"` + // StaticLabelsExclude is a set of labels to always exclude from the filter. This is done last + // so it will apply after the dynamic and static lists are added to the filter. StaticLabelsExclude map[string][]string `yaml:"static_labels_exclude"` } @@ -52,6 +63,10 @@ func (c *LabelFilterConfig) Validate() error { } } + if c.SyncInterval > 0 && len(c.DynamicLabels) == 0 { + return fmt.Errorf("sync_interval requires `dynamic_labels_include` to be set") + } + return nil } diff --git a/pkg/servergroup/config.go b/pkg/servergroup/config.go index 7b0e58fa0..088c6c4f7 100644 --- a/pkg/servergroup/config.go +++ b/pkg/servergroup/config.go @@ -161,7 +161,30 @@ type Config struct { // any new data after a specific given point in time AbsoluteTimeRangeConfig *AbsoluteTimeRangeConfig `yaml:"absolute_time_range"` - // TODO: docs + // LabelFilterConfig is a mechanism to restrict which queries are sent to the particular downstream. + // This is done by maintaining a "filter" of labels that are downstream and ensuring that the + // matchers for any particular query match that in-memory filter. This can be defined both + // statically and dynamically. + // NOTE: this is not a "secure" mechanism as it is relying on the query's matchers. So it is trivial + // for a malicious actor to work around this filter by changing matchers. + // Example: + // + // label_filter: + // # This will dynamically query the downstream for the values of `__name__` and `job` + // dynamic_labels: + // - __name__ + // - job + // # (optional) this will define a re-sync interval for dynamic labels from the downstream + // sync_interval: 5m + // # This will statically define a filter of labels + // static_labels_include: + // instance: + // - instance1 + // # This will statically define an exclusion list (removed from the filter)| + // static_labels_exclude: + // __name__: + // - up + LabelFilterConfig *promclient.LabelFilterConfig `yaml:"label_filter"` } From f4c2ca938c9cdc1815b27ea5697e1dddb84ccc01 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Fri, 17 Mar 2023 13:09:36 -0700 Subject: [PATCH 10/10] Fix race condition in CreateAndStart Today the CreateAndStart method starts the listener in a goroutine async. This is not a problem for regular operations, but during testing this creates a race condition. This changes the behavior so that listen will for-sure be started when the method returns and the hanlder will be in a background goroutine. --- pkg/server/api.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/pkg/server/api.go b/pkg/server/api.go index 4b0f9e2cb..3fae3cf9c 100644 --- a/pkg/server/api.go +++ b/pkg/server/api.go @@ -3,6 +3,7 @@ package server import ( "crypto/tls" "io" + "net" "net/http" "os" "time" @@ -17,17 +18,21 @@ import ( func CreateAndStart(bindAddr string, logFormat string, webReadTimeout time.Duration, accessLogOut io.Writer, router http.Handler, tlsConfigFile string) (*http.Server, error) { handler := createHandler(accessLogOut, router, logFormat) + ln, err := net.Listen("tcp", bindAddr) + if err != nil { + return nil, err + } srv := &http.Server{ - Addr: bindAddr, + Addr: ln.Addr().String(), Handler: handler, ReadTimeout: webReadTimeout, } if tlsConfigFile == "" { - return createAndStartHTTP(srv) + return createAndStartHTTP(srv, ln) } - return createAndStartHTTPS(srv, tlsConfigFile) + return createAndStartHTTPS(srv, ln, tlsConfigFile) } func createHandler(accessLogOut io.Writer, router http.Handler, logFormat string) http.Handler { @@ -46,12 +51,12 @@ func createHandler(accessLogOut io.Writer, router http.Handler, logFormat string return handler } -func createAndStartHTTP(srv *http.Server) (*http.Server, error) { +func createAndStartHTTP(srv *http.Server, ln net.Listener) (*http.Server, error) { srv.TLSConfig = nil go func() { logrus.Infof("promxy starting with HTTP...") - if err := srv.ListenAndServe(); err != nil { + if err := srv.Serve(ln); err != nil { if err == http.ErrServerClosed { return } @@ -61,7 +66,7 @@ func createAndStartHTTP(srv *http.Server) (*http.Server, error) { return srv, nil } -func createAndStartHTTPS(srv *http.Server, tlsConfigFile string) (*http.Server, error) { +func createAndStartHTTPS(srv *http.Server, ln net.Listener, tlsConfigFile string) (*http.Server, error) { tlsConfig, err := parseConfigFile(tlsConfigFile) if err != nil { return nil, err @@ -71,7 +76,7 @@ func createAndStartHTTPS(srv *http.Server, tlsConfigFile string) (*http.Server, go func() { logrus.Infof("promxy starting with TLS...") - if err := srv.ListenAndServeTLS("", ""); err != nil { + if err := srv.ServeTLS(ln, "", ""); err != nil { if err == http.ErrServerClosed { return }