From 538dee837e47bac0fca239673d98e3ac77cb82d6 Mon Sep 17 00:00:00 2001 From: Ramasai Venkatsitarambhaskar Tadepalli Date: Sat, 6 Jul 2024 17:52:00 -0400 Subject: [PATCH 1/2] Adding support to dynamically discover metrics configs across all namespaces in the cluster --- cmd/adapter/adapter.go | 33 +++++-- pkg/custom-provider/provider.go | 127 ++++++++++++++++++++++----- pkg/custom-provider/provider_test.go | 2 +- 3 files changed, 135 insertions(+), 27 deletions(-) diff --git a/cmd/adapter/adapter.go b/cmd/adapter/adapter.go index f5c819a05..58fa3c63b 100644 --- a/cmd/adapter/adapter.go +++ b/cmd/adapter/adapter.go @@ -19,6 +19,7 @@ package main import ( "crypto/tls" "crypto/x509" + "errors" "fmt" "net/http" "net/url" @@ -81,8 +82,12 @@ type PrometheusAdapter struct { // MetricsMaxAge is the period to query available metrics for MetricsMaxAge time.Duration // DisableHTTP2 indicates that http2 should not be enabled. - DisableHTTP2 bool - metricsConfig *adaptercfg.MetricsDiscoveryConfig + DisableHTTP2 bool + // Load Dynamic Adapter Configurations + EnableMetricsConfigsDiscovery bool + // ConfigMap labels to select on + MetricsConfigsLabels string + metricsConfig *adaptercfg.MetricsDiscoveryConfig } func (cmd *PrometheusAdapter) makePromClient() (prom.Client, error) { @@ -157,6 +162,10 @@ func (cmd *PrometheusAdapter) addFlags() { "period for which to query the set of available metrics from Prometheus") cmd.Flags().BoolVar(&cmd.DisableHTTP2, "disable-http2", cmd.DisableHTTP2, "Disable HTTP/2 support") + cmd.Flags().BoolVar(&cmd.EnableMetricsConfigsDiscovery, "enable-metrics-configs-disovery", cmd.EnableMetricsConfigsDiscovery, + "Load metrics configuration dynamically by querying the cluster for configmaps") + cmd.Flags().StringVar(&cmd.MetricsConfigsLabels, "metrics-configs-labels", cmd.MetricsConfigsLabels, + "Labels to query on while filtering ConfigMap objects when dynamically discovering metrics configuration") // Add logging flags logs.AddFlags(cmd.Flags()) @@ -165,7 +174,16 @@ func (cmd *PrometheusAdapter) addFlags() { func (cmd *PrometheusAdapter) loadConfig() error { // load metrics discovery configuration if cmd.AdapterConfigFile == "" { - return fmt.Errorf("no metrics discovery configuration file specified (make sure to use --config)") + if !cmd.EnableMetricsConfigsDiscovery { + return fmt.Errorf("loading dynamic config is turned off, and no metrics discovery configuration file specified (make sure to use --config)") + } + // Assign empty metrics config to prevent nilptr exceptions + cmd.metricsConfig = &adaptercfg.MetricsDiscoveryConfig{ + Rules: []adaptercfg.DiscoveryRule{}, + ExternalRules: []adaptercfg.DiscoveryRule{}, + ResourceRules: &adaptercfg.ResourceRules{}, + } + return nil } metricsConfig, err := adaptercfg.FromFile(cmd.AdapterConfigFile) if err != nil { @@ -178,7 +196,7 @@ func (cmd *PrometheusAdapter) loadConfig() error { } func (cmd *PrometheusAdapter) makeProvider(promClient prom.Client, stopCh <-chan struct{}) (provider.CustomMetricsProvider, error) { - if len(cmd.metricsConfig.Rules) == 0 { + if len(cmd.metricsConfig.Rules) == 0 && !cmd.EnableMetricsConfigsDiscovery { return nil, nil } @@ -203,7 +221,7 @@ func (cmd *PrometheusAdapter) makeProvider(promClient prom.Client, stopCh <-chan } // construct the provider and start it - cmProvider, runner := cmprov.NewPrometheusProvider(mapper, dynClient, promClient, namers, cmd.MetricsRelistInterval, cmd.MetricsMaxAge) + cmProvider, runner := cmprov.NewPrometheusProvider(mapper, dynClient, promClient, namers, cmd.MetricsRelistInterval, cmd.MetricsMaxAge, cmd.EnableMetricsConfigsDiscovery, cmd.MetricsConfigsLabels) runner.RunUntil(stopCh) return cmProvider, nil @@ -340,6 +358,11 @@ func main() { klog.Fatalf("unable to load metrics discovery config: %v", err) } + // verify the dynamic metrics loading properties + if cmd.EnableMetricsConfigsDiscovery && cmd.MetricsConfigsLabels == "" { + klog.Fatal("", errors.New("Did not specify --metrics-configs-labels to provide a list of labels to select on but metrics configs discovery is turned on")) + } + // stop channel closed on SIGTERM and SIGINT stopCh := genericapiserver.SetupSignalHandler() diff --git a/pkg/custom-provider/provider.go b/pkg/custom-provider/provider.go index 989042d52..45cb3b69d 100644 --- a/pkg/custom-provider/provider.go +++ b/pkg/custom-provider/provider.go @@ -19,7 +19,11 @@ package provider import ( "context" "fmt" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" "math" + adaptercfg "sigs.k8s.io/prometheus-adapter/pkg/config" + "sync/atomic" "time" pmodel "github.com/prometheus/common/model" @@ -51,31 +55,44 @@ type Runnable interface { RunUntil(stopChan <-chan struct{}) } -type prometheusProvider struct { - mapper apimeta.RESTMapper +type kubeClientAndMapper struct { kubeClient dynamic.Interface + mapper apimeta.RESTMapper +} + +type prometheusProvider struct { promClient prom.Client + kubeClientAndMapper SeriesRegistry } -func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []naming.MetricNamer, updateInterval time.Duration, maxAge time.Duration) (provider.CustomMetricsProvider, Runnable) { +func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []naming.MetricNamer, updateInterval time.Duration, maxAge time.Duration, enableMetricsConfigsDiscovery bool, metricsConfigsLabels string) (provider.CustomMetricsProvider, Runnable) { lister := &cachingMetricsLister{ - updateInterval: updateInterval, - maxAge: maxAge, - promClient: promClient, - namers: namers, + updateInterval: updateInterval, + maxAge: maxAge, + promClient: promClient, + namers: namers, + enableMetricsConfigsDiscovery: enableMetricsConfigsDiscovery, + discoveredNamers: atomic.Pointer[[]naming.MetricNamer]{}, + metricsConfigsLabels: metricsConfigsLabels, SeriesRegistry: &basicSeriesRegistry{ mapper: mapper, }, + kubeClientAndMapper: kubeClientAndMapper{ + kubeClient: kubeClient, + mapper: mapper, + }, } return &prometheusProvider{ - mapper: mapper, - kubeClient: kubeClient, promClient: promClient, + kubeClientAndMapper: kubeClientAndMapper{ + kubeClient: kubeClient, + mapper: mapper, + }, SeriesRegistry: lister, }, lister } @@ -212,11 +229,15 @@ func (p *prometheusProvider) GetMetricBySelector(ctx context.Context, namespace type cachingMetricsLister struct { SeriesRegistry - - promClient prom.Client - updateInterval time.Duration - maxAge time.Duration - namers []naming.MetricNamer + kubeClientAndMapper + + promClient prom.Client + updateInterval time.Duration + maxAge time.Duration + namers []naming.MetricNamer + enableMetricsConfigsDiscovery bool + discoveredNamers atomic.Pointer[[]naming.MetricNamer] + metricsConfigsLabels string } func (l *cachingMetricsLister) Run() { @@ -239,15 +260,24 @@ type selectorSeries struct { func (l *cachingMetricsLister) updateMetrics() error { startTime := pmodel.Now().Add(-1 * l.maxAge) + var allNamers []naming.MetricNamer + + if l.enableMetricsConfigsDiscovery { + l.discoverMetricsConfigs() + allNamers = append(l.namers, *l.discoveredNamers.Load()...) + } else { + allNamers = l.namers + } + // don't do duplicate queries when it's just the matchers that change seriesCacheByQuery := make(map[prom.Selector][]prom.Series) // these can take a while on large clusters, so launch in parallel // and don't duplicate selectors := make(map[prom.Selector]struct{}) - selectorSeriesChan := make(chan selectorSeries, len(l.namers)) - errs := make(chan error, len(l.namers)) - for _, namer := range l.namers { + selectorSeriesChan := make(chan selectorSeries, len(allNamers)) + errs := make(chan error, len(allNamers)) + for _, namer := range allNamers { sel := namer.Selector() if _, ok := selectors[sel]; ok { errs <- nil @@ -270,7 +300,7 @@ func (l *cachingMetricsLister) updateMetrics() error { } // iterate through, blocking until we've got all results - for range l.namers { + for range allNamers { if err := <-errs; err != nil { return fmt.Errorf("unable to update list of all metrics: %v", err) } @@ -280,8 +310,8 @@ func (l *cachingMetricsLister) updateMetrics() error { } close(errs) - newSeries := make([][]prom.Series, len(l.namers)) - for i, namer := range l.namers { + newSeries := make([][]prom.Series, len(allNamers)) + for i, namer := range allNamers { series, cached := seriesCacheByQuery[namer.Selector()] if !cached { return fmt.Errorf("unable to update list of all metrics: no metrics retrieved for query %q", namer.Selector()) @@ -291,5 +321,60 @@ func (l *cachingMetricsLister) updateMetrics() error { klog.V(10).Infof("Set available metric list from Prometheus to: %v", newSeries) - return l.SetSeries(newSeries, l.namers) + return l.SetSeries(newSeries, allNamers) +} + +func (l *cachingMetricsLister) discoverMetricsConfigs() { + configmaps, err := l.kubeClient.Resource(corev1.SchemeGroupVersion.WithResource("configmaps")).Namespace("").List(context.TODO(), metav1.ListOptions{ + LabelSelector: l.metricsConfigsLabels, + }) + if err != nil { + klog.V(5).ErrorS(err, "Could not obtain configmaps from apiserver with label: ", "label", l.metricsConfigsLabels) + return + } + + var discoveredNamers []naming.MetricNamer + var errs []error + + for _, cm := range configmaps.Items { + var configmap corev1.ConfigMap + err := runtime.DefaultUnstructuredConverter.FromUnstructured(cm.UnstructuredContent(), &configmap) + if err != nil { + klog.V(5).ErrorS(err, "Could not convert unstructured ConfigMap to structured representation.") + } + + if configmap.Data == nil { + klog.V(5).ErrorS(err, "ConfigMap does not have any data in it for name="+configmap.ObjectMeta.Name) + errs = append(errs, err) + continue + } + c, ok := configmap.Data["config.yaml"] + if !ok { + klog.V(5).ErrorS(err, "ConfigMap does not have the adapter YAML config under 'config.yaml' for="+configmap.ObjectMeta.Name) + errs = append(errs, err) + continue + } + metricsConfig, err := adaptercfg.FromYAML([]byte(c)) + if err != nil { + klog.V(5).ErrorS(err, "Could not unmarshal metrics config for name="+configmap.ObjectMeta.Name) + errs = append(errs, err) + continue + } + + namers, err := naming.NamersFromConfig(metricsConfig.Rules, l.mapper) + if err != nil { + klog.V(5).ErrorS(err, "Could not create a metric namer from given config for name="+configmap.ObjectMeta.Name) + errs = append(errs, err) + continue + } + + discoveredNamers = append(discoveredNamers, namers...) + } + + if len(errs) == 0 { + klog.V(5).Infof("Found %d namers, replacing the old namers with the new ones.", len(discoveredNamers)) + l.discoveredNamers.Store(&discoveredNamers) + } else { + klog.V(5).Infof("Found errors while creating namers from config -- not updating the existing list of dynamically discovered namers.") + } } diff --git a/pkg/custom-provider/provider_test.go b/pkg/custom-provider/provider_test.go index 9d1440049..ed5f4a444 100644 --- a/pkg/custom-provider/provider_test.go +++ b/pkg/custom-provider/provider_test.go @@ -45,7 +45,7 @@ func setupPrometheusProvider() (provider.CustomMetricsProvider, *fakeprom.FakePr namers, err := naming.NamersFromConfig(cfg.Rules, restMapper()) Expect(err).NotTo(HaveOccurred()) - prov, _ := NewPrometheusProvider(restMapper(), fakeKubeClient, fakeProm, namers, fakeProviderUpdateInterval, fakeProviderStartDuration) + prov, _ := NewPrometheusProvider(restMapper(), fakeKubeClient, fakeProm, namers, fakeProviderUpdateInterval, fakeProviderStartDuration, false, "") containerSel := prom.MatchSeries("", prom.NameMatches("^container_.*"), prom.LabelNeq("container", "POD"), prom.LabelNeq("namespace", ""), prom.LabelNeq("pod", "")) namespacedSel := prom.MatchSeries("", prom.LabelNeq("namespace", ""), prom.NameNotMatches("^container_.*")) From 954db9122ebb3af243d64475ad349c97f263904f Mon Sep 17 00:00:00 2001 From: Ramasai Venkatsitarambhaskar Tadepalli Date: Tue, 9 Jul 2024 10:18:54 -0400 Subject: [PATCH 2/2] Remove creation of second object --- pkg/custom-provider/provider.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/pkg/custom-provider/provider.go b/pkg/custom-provider/provider.go index 45cb3b69d..b32d14c72 100644 --- a/pkg/custom-provider/provider.go +++ b/pkg/custom-provider/provider.go @@ -68,6 +68,10 @@ type prometheusProvider struct { } func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []naming.MetricNamer, updateInterval time.Duration, maxAge time.Duration, enableMetricsConfigsDiscovery bool, metricsConfigsLabels string) (provider.CustomMetricsProvider, Runnable) { + k := kubeClientAndMapper{ + kubeClient: kubeClient, + mapper: mapper, + } lister := &cachingMetricsLister{ updateInterval: updateInterval, maxAge: maxAge, @@ -80,20 +84,14 @@ func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interfa SeriesRegistry: &basicSeriesRegistry{ mapper: mapper, }, - kubeClientAndMapper: kubeClientAndMapper{ - kubeClient: kubeClient, - mapper: mapper, - }, + kubeClientAndMapper: k, } return &prometheusProvider{ promClient: promClient, - kubeClientAndMapper: kubeClientAndMapper{ - kubeClient: kubeClient, - mapper: mapper, - }, - SeriesRegistry: lister, + kubeClientAndMapper: k, + SeriesRegistry: lister, }, lister }