Skip to content

Commit

Permalink
Add new selector for pod and service monitor (open-telemetry#1256)
Browse files Browse the repository at this point in the history
* Add new selector for pod and service monitor

* Use map, add tests

* Fix manifests

* Fixed make

* remove kustomization

* Changed based on feedback

* Docs

* config whoops
  • Loading branch information
jaronoff97 authored Nov 18, 2022
1 parent d8b35ba commit 088504b
Show file tree
Hide file tree
Showing 13 changed files with 294 additions and 47 deletions.
10 changes: 10 additions & 0 deletions apis/v1alpha1/opentelemetrycollector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,16 @@ type OpenTelemetryTargetAllocatorPrometheusCR struct {
// Enabled indicates whether to use a PrometheusOperator custom resources as targets or not.
// +optional
Enabled bool `json:"enabled,omitempty"`
// PodMonitors to be selected for target discovery.
// This is a map of {key,value} pairs. Each {key,value} in the map is going to exactly match a label in a
// PodMonitor's meta labels. The requirements are ANDed.
// +optional
PodMonitorSelector map[string]string `json:"podMonitorSelector,omitempty"`
// ServiceMonitors to be selected for target discovery.
// This is a map of {key,value} pairs. Each {key,value} in the map is going to exactly match a label in a
// ServiceMonitor's meta labels. The requirements are ANDed.
// +optional
ServiceMonitorSelector map[string]string `json:"serviceMonitorSelector,omitempty"`
}

// ScaleSubresourceStatus defines the observed state of the OpenTelemetryCollector's
Expand Down
16 changes: 15 additions & 1 deletion apis/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1722,6 +1722,22 @@ spec:
description: Enabled indicates whether to use a PrometheusOperator
custom resources as targets or not.
type: boolean
podMonitorSelector:
additionalProperties:
type: string
description: PodMonitors to be selected for target discovery.
This is a map of {key,value} pairs. Each {key,value} in
the map is going to exactly match a label in a PodMonitor's
meta labels. The requirements are ANDed.
type: object
serviceMonitorSelector:
additionalProperties:
type: string
description: ServiceMonitors to be selected for target discovery.
This is a map of {key,value} pairs. Each {key,value} in
the map is going to exactly match a label in a ServiceMonitor's
meta labels. The requirements are ANDed.
type: object
type: object
replicas:
description: Replicas is the number of pod instances for the underlying
Expand Down
10 changes: 6 additions & 4 deletions cmd/otel-allocator/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ const DefaultResyncTime = 5 * time.Minute
const DefaultConfigFilePath string = "/conf/targetallocator.yaml"

type Config struct {
LabelSelector map[string]string `yaml:"label_selector,omitempty"`
Config *promconfig.Config `yaml:"config"`
AllocationStrategy *string `yaml:"allocation_strategy,omitempty"`
FilterStrategy *string `yaml:"filter_strategy,omitempty"`
LabelSelector map[string]string `yaml:"label_selector,omitempty"`
Config *promconfig.Config `yaml:"config"`
AllocationStrategy *string `yaml:"allocation_strategy,omitempty"`
FilterStrategy *string `yaml:"filter_strategy,omitempty"`
PodMonitorSelector map[string]string `yaml:"pod_monitor_selector,omitempty"`
ServiceMonitorSelector map[string]string `yaml:"service_monitor_selector,omitempty"`
}

func (c Config) GetAllocationStrategy() string {
Expand Down
154 changes: 125 additions & 29 deletions cmd/otel-allocator/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,47 +15,143 @@
package config

import (
"fmt"
"testing"
"time"

commonconfig "github.com/prometheus/common/config"
promconfig "github.com/prometheus/prometheus/config"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/file"
"github.com/stretchr/testify/assert"
)

const testFile = "./testdata/config_test.yaml"

func TestConfigLoad(t *testing.T) {
expectedFileSDConfig := &file.SDConfig{
Files: []string{"./file_sd_test.json"},
RefreshInterval: model.Duration(300000000000),
func TestLoad(t *testing.T) {
type args struct {
file string
}
expectedStaticSDConfig := discovery.StaticConfig{
tests := []struct {
name string
args args
want Config
wantErr assert.ErrorAssertionFunc
}{
{
name: "file sd load",
args: args{
file: "./testdata/config_test.yaml",
},
want: Config{
LabelSelector: map[string]string{
"app.kubernetes.io/instance": "default.test",
"app.kubernetes.io/managed-by": "opentelemetry-operator",
},
Config: &promconfig.Config{
GlobalConfig: promconfig.GlobalConfig{
ScrapeInterval: model.Duration(60 * time.Second),
ScrapeTimeout: model.Duration(10 * time.Second),
EvaluationInterval: model.Duration(60 * time.Second),
},
ScrapeConfigs: []*promconfig.ScrapeConfig{
{
JobName: "prometheus",
HonorTimestamps: true,
ScrapeInterval: model.Duration(60 * time.Second),
ScrapeTimeout: model.Duration(10 * time.Second),
MetricsPath: "/metrics",
Scheme: "http",
HTTPClientConfig: commonconfig.HTTPClientConfig{
FollowRedirects: true,
},
ServiceDiscoveryConfigs: []discovery.Config{
&file.SDConfig{
Files: []string{"./file_sd_test.json"},
RefreshInterval: model.Duration(5 * time.Minute),
},
discovery.StaticConfig{
{
Targets: []model.LabelSet{
{model.AddressLabel: "prom.domain:9001"},
{model.AddressLabel: "prom.domain:9002"},
{model.AddressLabel: "prom.domain:9003"},
},
Labels: model.LabelSet{
"my": "label",
},
Source: "0",
},
},
},
},
},
},
},
wantErr: assert.NoError,
},
{
Targets: []model.LabelSet{
{model.AddressLabel: "prom.domain:9001"},
{model.AddressLabel: "prom.domain:9002"},
{model.AddressLabel: "prom.domain:9003"},
name: "service monitor pod monitor selector",
args: args{
file: "./testdata/pod_service_selector_test.yaml",
},
Labels: model.LabelSet{
"my": "label",
want: Config{
LabelSelector: map[string]string{
"app.kubernetes.io/instance": "default.test",
"app.kubernetes.io/managed-by": "opentelemetry-operator",
},
Config: &promconfig.Config{
GlobalConfig: promconfig.GlobalConfig{
ScrapeInterval: model.Duration(60 * time.Second),
ScrapeTimeout: model.Duration(10 * time.Second),
EvaluationInterval: model.Duration(60 * time.Second),
},
ScrapeConfigs: []*promconfig.ScrapeConfig{
{
JobName: "prometheus",
HonorTimestamps: true,
ScrapeInterval: model.Duration(60 * time.Second),
ScrapeTimeout: model.Duration(10 * time.Second),
MetricsPath: "/metrics",
Scheme: "http",
HTTPClientConfig: commonconfig.HTTPClientConfig{
FollowRedirects: true,
},
ServiceDiscoveryConfigs: []discovery.Config{
discovery.StaticConfig{
{
Targets: []model.LabelSet{
{model.AddressLabel: "prom.domain:9001"},
{model.AddressLabel: "prom.domain:9002"},
{model.AddressLabel: "prom.domain:9003"},
},
Labels: model.LabelSet{
"my": "label",
},
Source: "0",
},
},
},
},
},
},
PodMonitorSelector: map[string]string{
"release": "test",
},
ServiceMonitorSelector: map[string]string{
"release": "test",
},
},
Source: "0",
wantErr: assert.NoError,
},
}

cfg := Config{}
err := unmarshal(&cfg, testFile)
assert.NoError(t, err)

scrapeConfig := *cfg.Config.ScrapeConfigs[0]
actualFileSDConfig := scrapeConfig.ServiceDiscoveryConfigs[0]
actulaStaticSDConfig := scrapeConfig.ServiceDiscoveryConfigs[1]
t.Log(actulaStaticSDConfig)

assert.Equal(t, cfg.LabelSelector["app.kubernetes.io/instance"], "default.test")
assert.Equal(t, cfg.LabelSelector["app.kubernetes.io/managed-by"], "opentelemetry-operator")
assert.Equal(t, scrapeConfig.JobName, "prometheus")
assert.Equal(t, expectedFileSDConfig, actualFileSDConfig)
assert.Equal(t, expectedStaticSDConfig, actulaStaticSDConfig)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := Load(tt.args.file)
if !tt.wantErr(t, err, fmt.Sprintf("Load(%v)", tt.args.file)) {
return
}
assert.Equalf(t, tt.want, got, "Load(%v)", tt.args.file)
})
}
}
14 changes: 14 additions & 0 deletions cmd/otel-allocator/config/testdata/pod_service_selector_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
label_selector:
app.kubernetes.io/instance: default.test
app.kubernetes.io/managed-by: opentelemetry-operator
pod_monitor_selector:
release: test
service_monitor_selector:
release: test
config:
scrape_configs:
- job_name: prometheus
static_configs:
- targets: ["prom.domain:9001", "prom.domain:9002", "prom.domain:9003"]
labels:
my: label
2 changes: 1 addition & 1 deletion cmd/otel-allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func main() {
os.Exit(1)
}

watcher, err := allocatorWatcher.NewWatcher(setupLog, cliConf, allocator)
watcher, err := allocatorWatcher.NewWatcher(setupLog, cfg, cliConf, allocator)
if err != nil {
setupLog.Error(err, "Can't start the watchers")
os.Exit(1)
Expand Down
8 changes: 4 additions & 4 deletions cmd/otel-allocator/watcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,21 @@ func (e EventSource) String() string {
return eventSourceToString[e]
}

func NewWatcher(logger logr.Logger, config config.CLIConfig, allocator allocation.Allocator) (*Manager, error) {
func NewWatcher(logger logr.Logger, cfg config.Config, cliConfig config.CLIConfig, allocator allocation.Allocator) (*Manager, error) {
watcher := Manager{
allocator: allocator,
Events: make(chan Event),
Errors: make(chan error),
}

fileWatcher, err := newConfigMapWatcher(logger, config)
fileWatcher, err := newConfigMapWatcher(logger, cliConfig)
if err != nil {
return nil, err
}
watcher.watchers = append(watcher.watchers, &fileWatcher)

if *config.PromCRWatcherConf.Enabled {
promWatcher, err := newCRDMonitorWatcher(config)
if *cliConfig.PromCRWatcherConf.Enabled {
promWatcher, err := newCRDMonitorWatcher(cfg, cliConfig)
if err != nil {
return nil, err
}
Expand Down
34 changes: 26 additions & 8 deletions cmd/otel-allocator/watcher/promOperator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
"k8s.io/client-go/tools/cache"
)

func newCRDMonitorWatcher(config allocatorconfig.CLIConfig) (*PrometheusCRWatcher, error) {
mClient, err := monitoringclient.NewForConfig(config.ClusterConfig)
func newCRDMonitorWatcher(cfg allocatorconfig.Config, cliConfig allocatorconfig.CLIConfig) (*PrometheusCRWatcher, error) {
mClient, err := monitoringclient.NewForConfig(cliConfig.ClusterConfig)
if err != nil {
return nil, err
}
Expand All @@ -61,11 +61,17 @@ func newCRDMonitorWatcher(config allocatorconfig.CLIConfig) (*PrometheusCRWatche
return nil, err
}

servMonSelector := getSelector(cfg.ServiceMonitorSelector)

podMonSelector := getSelector(cfg.PodMonitorSelector)

return &PrometheusCRWatcher{
kubeMonitoringClient: mClient,
informers: monitoringInformers,
stopChannel: make(chan struct{}),
configGenerator: generator,
kubeMonitoringClient: mClient,
informers: monitoringInformers,
stopChannel: make(chan struct{}),
configGenerator: generator,
serviceMonitorSelector: servMonSelector,
podMonitorSelector: podMonSelector,
}, nil
}

Expand All @@ -74,6 +80,17 @@ type PrometheusCRWatcher struct {
informers map[string]*informers.ForResource
stopChannel chan struct{}
configGenerator *prometheus.ConfigGenerator

serviceMonitorSelector labels.Selector
podMonitorSelector labels.Selector
}

func getSelector(s map[string]string) labels.Selector {
sel := labels.NewSelector()
if s == nil {
return sel
}
return labels.SelectorFromSet(s)
}

// Start wrapped informers and wait for an initial sync.
Expand Down Expand Up @@ -118,7 +135,8 @@ func (w *PrometheusCRWatcher) Close() error {

func (w *PrometheusCRWatcher) CreatePromConfig(kubeConfigPath string) (*promconfig.Config, error) {
serviceMonitorInstances := make(map[string]*monitoringv1.ServiceMonitor)
smRetrieveErr := w.informers[monitoringv1.ServiceMonitorName].ListAll(labels.NewSelector(), func(sm interface{}) {

smRetrieveErr := w.informers[monitoringv1.ServiceMonitorName].ListAll(w.serviceMonitorSelector, func(sm interface{}) {
monitor := sm.(*monitoringv1.ServiceMonitor)
key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(monitor)
serviceMonitorInstances[key] = monitor
Expand All @@ -128,7 +146,7 @@ func (w *PrometheusCRWatcher) CreatePromConfig(kubeConfigPath string) (*promconf
}

podMonitorInstances := make(map[string]*monitoringv1.PodMonitor)
pmRetrieveErr := w.informers[monitoringv1.PodMonitorName].ListAll(labels.NewSelector(), func(pm interface{}) {
pmRetrieveErr := w.informers[monitoringv1.PodMonitorName].ListAll(w.podMonitorSelector, func(pm interface{}) {
monitor := pm.(*monitoringv1.PodMonitor)
key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(monitor)
podMonitorInstances[key] = monitor
Expand Down
Loading

0 comments on commit 088504b

Please sign in to comment.