Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support to dynamically discover metrics configs across all namespaces in the cluster #669

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions cmd/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main
import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"net/http"
"net/url"
Expand Down Expand Up @@ -81,8 +82,12 @@ type PrometheusAdapter struct {
// MetricsMaxAge is the period to query available metrics for
MetricsMaxAge time.Duration
// DisableHTTP2 indicates that http2 should not be enabled.
DisableHTTP2 bool
metricsConfig *adaptercfg.MetricsDiscoveryConfig
DisableHTTP2 bool
// Load Dynamic Adapter Configurations
EnableMetricsConfigsDiscovery bool
// ConfigMap labels to select on
MetricsConfigsLabels string
metricsConfig *adaptercfg.MetricsDiscoveryConfig
}

func (cmd *PrometheusAdapter) makePromClient() (prom.Client, error) {
Expand Down Expand Up @@ -157,6 +162,10 @@ func (cmd *PrometheusAdapter) addFlags() {
"period for which to query the set of available metrics from Prometheus")
cmd.Flags().BoolVar(&cmd.DisableHTTP2, "disable-http2", cmd.DisableHTTP2,
"Disable HTTP/2 support")
cmd.Flags().BoolVar(&cmd.EnableMetricsConfigsDiscovery, "enable-metrics-configs-disovery", cmd.EnableMetricsConfigsDiscovery,
"Load metrics configuration dynamically by querying the cluster for configmaps")
cmd.Flags().StringVar(&cmd.MetricsConfigsLabels, "metrics-configs-labels", cmd.MetricsConfigsLabels,
"Labels to query on while filtering ConfigMap objects when dynamically discovering metrics configuration")

// Add logging flags
logs.AddFlags(cmd.Flags())
Expand All @@ -165,7 +174,16 @@ func (cmd *PrometheusAdapter) addFlags() {
func (cmd *PrometheusAdapter) loadConfig() error {
// load metrics discovery configuration
if cmd.AdapterConfigFile == "" {
return fmt.Errorf("no metrics discovery configuration file specified (make sure to use --config)")
if !cmd.EnableMetricsConfigsDiscovery {
return fmt.Errorf("loading dynamic config is turned off, and no metrics discovery configuration file specified (make sure to use --config)")
}
// Assign empty metrics config to prevent nilptr exceptions
cmd.metricsConfig = &adaptercfg.MetricsDiscoveryConfig{
Rules: []adaptercfg.DiscoveryRule{},
ExternalRules: []adaptercfg.DiscoveryRule{},
ResourceRules: &adaptercfg.ResourceRules{},
}
return nil
}
metricsConfig, err := adaptercfg.FromFile(cmd.AdapterConfigFile)
if err != nil {
Expand All @@ -178,7 +196,7 @@ func (cmd *PrometheusAdapter) loadConfig() error {
}

func (cmd *PrometheusAdapter) makeProvider(promClient prom.Client, stopCh <-chan struct{}) (provider.CustomMetricsProvider, error) {
if len(cmd.metricsConfig.Rules) == 0 {
if len(cmd.metricsConfig.Rules) == 0 && !cmd.EnableMetricsConfigsDiscovery {
return nil, nil
}

Expand All @@ -203,7 +221,7 @@ func (cmd *PrometheusAdapter) makeProvider(promClient prom.Client, stopCh <-chan
}

// construct the provider and start it
cmProvider, runner := cmprov.NewPrometheusProvider(mapper, dynClient, promClient, namers, cmd.MetricsRelistInterval, cmd.MetricsMaxAge)
cmProvider, runner := cmprov.NewPrometheusProvider(mapper, dynClient, promClient, namers, cmd.MetricsRelistInterval, cmd.MetricsMaxAge, cmd.EnableMetricsConfigsDiscovery, cmd.MetricsConfigsLabels)
runner.RunUntil(stopCh)

return cmProvider, nil
Expand Down Expand Up @@ -340,6 +358,11 @@ func main() {
klog.Fatalf("unable to load metrics discovery config: %v", err)
}

// verify the dynamic metrics loading properties
if cmd.EnableMetricsConfigsDiscovery && cmd.MetricsConfigsLabels == "" {
klog.Fatal("", errors.New("Did not specify --metrics-configs-labels to provide a list of labels to select on but metrics configs discovery is turned on"))
}

// stop channel closed on SIGTERM and SIGINT
stopCh := genericapiserver.SetupSignalHandler()

Expand Down
127 changes: 105 additions & 22 deletions pkg/custom-provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ package provider
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"math"
adaptercfg "sigs.k8s.io/prometheus-adapter/pkg/config"
"sync/atomic"
"time"

pmodel "github.com/prometheus/common/model"
Expand Down Expand Up @@ -51,32 +55,43 @@ type Runnable interface {
RunUntil(stopChan <-chan struct{})
}

type prometheusProvider struct {
mapper apimeta.RESTMapper
type kubeClientAndMapper struct {
kubeClient dynamic.Interface
mapper apimeta.RESTMapper
}

type prometheusProvider struct {
promClient prom.Client

kubeClientAndMapper
SeriesRegistry
}

func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []naming.MetricNamer, updateInterval time.Duration, maxAge time.Duration) (provider.CustomMetricsProvider, Runnable) {
func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []naming.MetricNamer, updateInterval time.Duration, maxAge time.Duration, enableMetricsConfigsDiscovery bool, metricsConfigsLabels string) (provider.CustomMetricsProvider, Runnable) {
k := kubeClientAndMapper{
kubeClient: kubeClient,
mapper: mapper,
}
lister := &cachingMetricsLister{
updateInterval: updateInterval,
maxAge: maxAge,
promClient: promClient,
namers: namers,
updateInterval: updateInterval,
maxAge: maxAge,
promClient: promClient,
namers: namers,
enableMetricsConfigsDiscovery: enableMetricsConfigsDiscovery,
discoveredNamers: atomic.Pointer[[]naming.MetricNamer]{},
metricsConfigsLabels: metricsConfigsLabels,

SeriesRegistry: &basicSeriesRegistry{
mapper: mapper,
},
kubeClientAndMapper: k,
}

return &prometheusProvider{
mapper: mapper,
kubeClient: kubeClient,
promClient: promClient,

SeriesRegistry: lister,
kubeClientAndMapper: k,
SeriesRegistry: lister,
}, lister
}

Expand Down Expand Up @@ -212,11 +227,15 @@ func (p *prometheusProvider) GetMetricBySelector(ctx context.Context, namespace

type cachingMetricsLister struct {
SeriesRegistry

promClient prom.Client
updateInterval time.Duration
maxAge time.Duration
namers []naming.MetricNamer
kubeClientAndMapper

promClient prom.Client
updateInterval time.Duration
maxAge time.Duration
namers []naming.MetricNamer
enableMetricsConfigsDiscovery bool
discoveredNamers atomic.Pointer[[]naming.MetricNamer]
metricsConfigsLabels string
}

func (l *cachingMetricsLister) Run() {
Expand All @@ -239,15 +258,24 @@ type selectorSeries struct {
func (l *cachingMetricsLister) updateMetrics() error {
startTime := pmodel.Now().Add(-1 * l.maxAge)

var allNamers []naming.MetricNamer

if l.enableMetricsConfigsDiscovery {
l.discoverMetricsConfigs()
allNamers = append(l.namers, *l.discoveredNamers.Load()...)
} else {
allNamers = l.namers
}

// don't do duplicate queries when it's just the matchers that change
seriesCacheByQuery := make(map[prom.Selector][]prom.Series)

// these can take a while on large clusters, so launch in parallel
// and don't duplicate
selectors := make(map[prom.Selector]struct{})
selectorSeriesChan := make(chan selectorSeries, len(l.namers))
errs := make(chan error, len(l.namers))
for _, namer := range l.namers {
selectorSeriesChan := make(chan selectorSeries, len(allNamers))
errs := make(chan error, len(allNamers))
for _, namer := range allNamers {
sel := namer.Selector()
if _, ok := selectors[sel]; ok {
errs <- nil
Expand All @@ -270,7 +298,7 @@ func (l *cachingMetricsLister) updateMetrics() error {
}

// iterate through, blocking until we've got all results
for range l.namers {
for range allNamers {
if err := <-errs; err != nil {
return fmt.Errorf("unable to update list of all metrics: %v", err)
}
Expand All @@ -280,8 +308,8 @@ func (l *cachingMetricsLister) updateMetrics() error {
}
close(errs)

newSeries := make([][]prom.Series, len(l.namers))
for i, namer := range l.namers {
newSeries := make([][]prom.Series, len(allNamers))
for i, namer := range allNamers {
series, cached := seriesCacheByQuery[namer.Selector()]
if !cached {
return fmt.Errorf("unable to update list of all metrics: no metrics retrieved for query %q", namer.Selector())
Expand All @@ -291,5 +319,60 @@ func (l *cachingMetricsLister) updateMetrics() error {

klog.V(10).Infof("Set available metric list from Prometheus to: %v", newSeries)

return l.SetSeries(newSeries, l.namers)
return l.SetSeries(newSeries, allNamers)
}

func (l *cachingMetricsLister) discoverMetricsConfigs() {
configmaps, err := l.kubeClient.Resource(corev1.SchemeGroupVersion.WithResource("configmaps")).Namespace("").List(context.TODO(), metav1.ListOptions{
LabelSelector: l.metricsConfigsLabels,
})
if err != nil {
klog.V(5).ErrorS(err, "Could not obtain configmaps from apiserver with label: ", "label", l.metricsConfigsLabels)
return
}

var discoveredNamers []naming.MetricNamer
var errs []error

for _, cm := range configmaps.Items {
var configmap corev1.ConfigMap
err := runtime.DefaultUnstructuredConverter.FromUnstructured(cm.UnstructuredContent(), &configmap)
if err != nil {
klog.V(5).ErrorS(err, "Could not convert unstructured ConfigMap to structured representation.")
}

if configmap.Data == nil {
klog.V(5).ErrorS(err, "ConfigMap does not have any data in it for name="+configmap.ObjectMeta.Name)
errs = append(errs, err)
continue
}
c, ok := configmap.Data["config.yaml"]
if !ok {
klog.V(5).ErrorS(err, "ConfigMap does not have the adapter YAML config under 'config.yaml' for="+configmap.ObjectMeta.Name)
errs = append(errs, err)
continue
}
metricsConfig, err := adaptercfg.FromYAML([]byte(c))
if err != nil {
klog.V(5).ErrorS(err, "Could not unmarshal metrics config for name="+configmap.ObjectMeta.Name)
errs = append(errs, err)
continue
}

namers, err := naming.NamersFromConfig(metricsConfig.Rules, l.mapper)
if err != nil {
klog.V(5).ErrorS(err, "Could not create a metric namer from given config for name="+configmap.ObjectMeta.Name)
errs = append(errs, err)
continue
}

discoveredNamers = append(discoveredNamers, namers...)
}

if len(errs) == 0 {
klog.V(5).Infof("Found %d namers, replacing the old namers with the new ones.", len(discoveredNamers))
l.discoveredNamers.Store(&discoveredNamers)
} else {
klog.V(5).Infof("Found errors while creating namers from config -- not updating the existing list of dynamically discovered namers.")
}
}
2 changes: 1 addition & 1 deletion pkg/custom-provider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func setupPrometheusProvider() (provider.CustomMetricsProvider, *fakeprom.FakePr
namers, err := naming.NamersFromConfig(cfg.Rules, restMapper())
Expect(err).NotTo(HaveOccurred())

prov, _ := NewPrometheusProvider(restMapper(), fakeKubeClient, fakeProm, namers, fakeProviderUpdateInterval, fakeProviderStartDuration)
prov, _ := NewPrometheusProvider(restMapper(), fakeKubeClient, fakeProm, namers, fakeProviderUpdateInterval, fakeProviderStartDuration, false, "")

containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod", ""))
namespacedSel := prom.MatchSeries("", prom.LabelNeq("namespace", ""), prom.NameNotMatches("^container_.*"))
Expand Down