diff --git a/manifests/03-clusterrole.yaml b/manifests/03-clusterrole.yaml index 0e2a71831..56c38945a 100644 --- a/manifests/03-clusterrole.yaml +++ b/manifests/03-clusterrole.yaml @@ -103,6 +103,15 @@ rules: - get - list - delete + - apiGroups: + - "" + resources: + - configmaps + verbs: + - get + - list + - watch + --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding @@ -298,6 +307,13 @@ rules: verbs: - get - list + - apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/pkg/config/configobserver/config_aggregator.go b/pkg/config/configobserver/config_aggregator.go new file mode 100644 index 000000000..941fa2308 --- /dev/null +++ b/pkg/config/configobserver/config_aggregator.go @@ -0,0 +1,205 @@ +package configobserver + +import ( + "context" + "sync" + + "github.com/openshift/insights-operator/pkg/config" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" +) + +const insightsNamespaceName = "openshift-insights" + +type Interface interface { + Config() *config.InsightsConfiguration + ConfigChanged() (<-chan struct{}, func()) + Listen(ctx context.Context) +} + +// ConfigAggregator is an auxiliary structure that should obviate the need for the use of +// legacy secret configurator and the new config map informer +type ConfigAggregator struct { + lock sync.Mutex + legacyConfigurator Configurator + configMapInformer ConfigMapInformer + config *config.InsightsConfiguration + listeners map[chan struct{}]struct{} + usingInformer bool + kubeClient kubernetes.Interface +} + +func NewConfigAggregator(ctrl Configurator, configMapInf ConfigMapInformer) Interface { + confAggreg := &ConfigAggregator{ + legacyConfigurator: ctrl, + configMapInformer: configMapInf, + listeners: make(map[chan struct{}]struct{}), + usingInformer: true, + } + confAggreg.mergeUsingInformer() + return confAggreg +} + +// NewStaticConfigAggregator is a constructor used mainly for the techpreview configuration reading. +// There is no reason to create and start any informer in the techpreview when data gathering runs as a job. +// It is sufficient to read the config once when the job is created and/or starting. +func NewStaticConfigAggregator(ctrl Configurator, cli kubernetes.Interface) Interface { + confAggreg := &ConfigAggregator{ + legacyConfigurator: ctrl, + configMapInformer: nil, + kubeClient: cli, + } + + confAggreg.mergeStatically() + return confAggreg +} + +// mergeUsingInformer merges config values for the legacy "support" secret configuration and +// from the new configmap informer. The "insights-config" configmap always takes +// precedence if it exists and is not empty. +func (c *ConfigAggregator) mergeUsingInformer() { + c.lock.Lock() + defer c.lock.Unlock() + newConf := c.configMapInformer.Config() + conf := c.legacyConfigToInsightsConfiguration() + + if newConf == nil { + c.config = conf + return + } + + c.merge(conf, newConf) +} + +// mergeStatically merges config values for the legacy "support" secret configuration and +// from the "insights-config" configmap by getting and reading the confimap directly without +// using an informer. +func (c *ConfigAggregator) mergeStatically() { + c.lock.Lock() + defer c.lock.Unlock() + conf := c.legacyConfigToInsightsConfiguration() + c.config = conf + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cm, err := c.kubeClient.CoreV1().ConfigMaps(insightsNamespaceName).Get(ctx, insightsConfigMapName, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return + } + klog.Error(err) + } + + cmConf, err := readConfigAndDecode(cm) + if err != nil { + klog.Error("Failed to read configmap configuration: %v", err) + return + } + + c.merge(conf, cmConf) +} + +func (c *ConfigAggregator) merge(defaultCfg, newCfg *config.InsightsConfiguration) { + // read config map values and merge + if newCfg.DataReporting.Interval != 0 { + defaultCfg.DataReporting.Interval = newCfg.DataReporting.Interval + } + + if newCfg.DataReporting.UploadEndpoint != "" { + defaultCfg.DataReporting.UploadEndpoint = newCfg.DataReporting.UploadEndpoint + } + + if newCfg.DataReporting.DownloadEndpoint != "" { + defaultCfg.DataReporting.DownloadEndpoint = newCfg.DataReporting.DownloadEndpoint + } + + if newCfg.DataReporting.DownloadEndpointTechPreview != "" { + defaultCfg.DataReporting.DownloadEndpointTechPreview = newCfg.DataReporting.DownloadEndpointTechPreview + } + + if newCfg.DataReporting.ProcessingStatusEndpoint != "" { + defaultCfg.DataReporting.ProcessingStatusEndpoint = newCfg.DataReporting.ProcessingStatusEndpoint + } + + if newCfg.DataReporting.ConditionalGathererEndpoint != "" { + defaultCfg.DataReporting.ConditionalGathererEndpoint = newCfg.DataReporting.ConditionalGathererEndpoint + } + + if newCfg.DataReporting.StoragePath != "" { + defaultCfg.DataReporting.StoragePath = newCfg.DataReporting.StoragePath + } + c.config = defaultCfg +} + +func (c *ConfigAggregator) Config() *config.InsightsConfiguration { + if c.usingInformer { + c.mergeUsingInformer() + } else { + c.mergeStatically() + } + return c.config +} + +// Listen listens to the legacy Secret configurator/observer as well as the +// new config map informer. When any configuration change is observed then all the listeners +// are notified. +func (c *ConfigAggregator) Listen(ctx context.Context) { + legacyCh, legacyCloseFn := c.legacyConfigurator.ConfigChanged() + cmCh, cmICloseFn := c.configMapInformer.ConfigChanged() + defer func() { + legacyCloseFn() + cmICloseFn() + }() + + for { + select { + case <-legacyCh: + c.notifyListeners() + case <-cmCh: + c.notifyListeners() + case <-ctx.Done(): + return + } + } +} + +func (c *ConfigAggregator) notifyListeners() { + for ch := range c.listeners { + ch <- struct{}{} + } +} + +func (c *ConfigAggregator) ConfigChanged() (configCh <-chan struct{}, closeFn func()) { + c.lock.Lock() + defer c.lock.Unlock() + ch := make(chan struct{}, 1) + c.listeners[ch] = struct{}{} + return ch, func() { + c.lock.Lock() + defer c.lock.Unlock() + close(ch) + delete(c.listeners, ch) + } +} + +func (c *ConfigAggregator) legacyConfigToInsightsConfiguration() *config.InsightsConfiguration { + legacyConfig := c.legacyConfigurator.Config() + return &config.InsightsConfiguration{ + DataReporting: config.DataReporting{ + Interval: legacyConfig.Interval, + UploadEndpoint: legacyConfig.Endpoint, + DownloadEndpoint: legacyConfig.ReportEndpoint, + ConditionalGathererEndpoint: legacyConfig.ConditionalGathererEndpoint, + ProcessingStatusEndpoint: legacyConfig.ProcessingStatusEndpoint, + DownloadEndpointTechPreview: legacyConfig.ReportEndpointTechPreview, + // This can't be overridden by the config map - it's not merged in the merge function. + // The value is based on the presence of the token in the pull-secret and the config map + // doesn't know anything about secrets + Enabled: legacyConfig.Report, + StoragePath: legacyConfig.StoragePath, + ReportPullingDelay: legacyConfig.ReportPullingDelay, + }, + } +} diff --git a/pkg/config/configobserver/config_aggregator_test.go b/pkg/config/configobserver/config_aggregator_test.go new file mode 100644 index 000000000..67534efae --- /dev/null +++ b/pkg/config/configobserver/config_aggregator_test.go @@ -0,0 +1,239 @@ +package configobserver + +import ( + "testing" + "time" + + "github.com/openshift/insights-operator/pkg/config" + "github.com/openshift/library-go/pkg/controller/factory" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubefake "k8s.io/client-go/kubernetes/fake" +) + +func TestMergeStatically(t *testing.T) { + tests := []struct { + name string + configCM *v1.ConfigMap + legacyConfig config.Controller + expectedConfig *config.InsightsConfiguration + }{ + { + name: "No config map exists - legacy config is used", + configCM: nil, + legacyConfig: config.Controller{ + Report: true, + StoragePath: "/foo/bar/", + Endpoint: "http://testing.here", + ReportEndpoint: "http://reportendpoint.here", + Interval: 2 * time.Hour, + ConditionalGathererEndpoint: "http://conditionalendpoint.here", + }, + expectedConfig: &config.InsightsConfiguration{ + DataReporting: config.DataReporting{ + Enabled: true, + UploadEndpoint: "http://testing.here", + StoragePath: "/foo/bar/", + DownloadEndpoint: "http://reportendpoint.here", + Interval: 2 * time.Hour, + ConditionalGathererEndpoint: "http://conditionalendpoint.here", + }, + }, + }, + { + name: "Config map exists and overrides legacy config", + configCM: &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: insightsConfigMapName, + Namespace: "openshift-insights", + }, + Data: map[string]string{ + "config.yaml": ` +dataReporting: + interval: 1h + uploadEndpoint: https://overriden.upload/endpoint + storagePath: /var/lib/test + downloadEndpoint: https://overriden.download/endpoint + conditionalGathererEndpoint: https://overriden.conditional/endpoint + processingStatusEndpoint: https://overriden.status/endpoint + downloadEndpointTechPreview: https://overriden.downloadtechpreview/endpoint`, + }, + }, + legacyConfig: config.Controller{ + Report: true, + StoragePath: "/foo/bar/", + Endpoint: "http://testing.here", + ReportEndpoint: "http://reportendpoint.here", + Interval: 2 * time.Hour, + ConditionalGathererEndpoint: "http://conditionalendpoint.here", + ProcessingStatusEndpoint: "http://statusendpoint.here", + ReportEndpointTechPreview: "http://downloadtpendpoint.here", + }, + expectedConfig: &config.InsightsConfiguration{ + DataReporting: config.DataReporting{ + Enabled: true, + UploadEndpoint: "https://overriden.upload/endpoint", + StoragePath: "/var/lib/test", + DownloadEndpoint: "https://overriden.download/endpoint", + Interval: 1 * time.Hour, + ConditionalGathererEndpoint: "https://overriden.conditional/endpoint", + ProcessingStatusEndpoint: "https://overriden.status/endpoint", + DownloadEndpointTechPreview: "https://overriden.downloadtechpreview/endpoint", + }, + }, + }, + { + name: "Config map cannot override \"Report\" bool attribute", + configCM: &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: insightsConfigMapName, + Namespace: "openshift-insights", + }, + Data: map[string]string{ + "config.yaml": ` +dataReporting: + enabled: true + uploadEndpoint: https://overriden.upload/endpoint`, + }, + }, + legacyConfig: config.Controller{ + Report: false, + Endpoint: "http://testing.here", + }, + expectedConfig: &config.InsightsConfiguration{ + DataReporting: config.DataReporting{ + Enabled: false, + UploadEndpoint: "https://overriden.upload/endpoint", + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var cs *kubefake.Clientset + if tt.configCM != nil { + cs = kubefake.NewSimpleClientset(tt.configCM) + } else { + cs = kubefake.NewSimpleClientset() + } + mockSecretConf := config.NewMockSecretConfigurator(&tt.legacyConfig) + staticAggregator := NewStaticConfigAggregator(mockSecretConf, cs) + + testConfig := staticAggregator.Config() + assert.Equal(t, tt.expectedConfig, testConfig) + }) + } +} + +func TestMergeUsingInformer(t *testing.T) { + tests := []struct { + name string + configFromInf config.InsightsConfiguration + legacyConfig config.Controller + expectedConfig *config.InsightsConfiguration + }{ + { + name: "No config map exists - legacy config is used", + configFromInf: config.InsightsConfiguration{}, + legacyConfig: config.Controller{ + Report: true, + StoragePath: "/foo/bar/", + Endpoint: "http://testing.here", + ReportEndpoint: "http://reportendpoint.here", + Interval: 2 * time.Hour, + ConditionalGathererEndpoint: "http://conditionalendpoint.here", + }, + expectedConfig: &config.InsightsConfiguration{ + DataReporting: config.DataReporting{ + Enabled: true, + UploadEndpoint: "http://testing.here", + StoragePath: "/foo/bar/", + DownloadEndpoint: "http://reportendpoint.here", + Interval: 2 * time.Hour, + ConditionalGathererEndpoint: "http://conditionalendpoint.here", + }, + }, + }, + { + name: "Config map exists and overrides legacy config", + configFromInf: config.InsightsConfiguration{ + DataReporting: config.DataReporting{ + Interval: 1 * time.Hour, + UploadEndpoint: "https://overriden.upload/endpoint", + StoragePath: "/var/lib/test", + DownloadEndpoint: "https://overriden.download/endpoint", + ConditionalGathererEndpoint: "https://overriden.conditional/endpoint", + }, + }, + legacyConfig: config.Controller{ + Report: true, + StoragePath: "/foo/bar/", + Endpoint: "http://testing.here", + ReportEndpoint: "http://reportendpoint.here", + Interval: 2 * time.Hour, + ConditionalGathererEndpoint: "http://conditionalendpoint.here", + }, + expectedConfig: &config.InsightsConfiguration{ + DataReporting: config.DataReporting{ + Enabled: true, + UploadEndpoint: "https://overriden.upload/endpoint", + StoragePath: "/var/lib/test", + DownloadEndpoint: "https://overriden.download/endpoint", + Interval: 1 * time.Hour, + ConditionalGathererEndpoint: "https://overriden.conditional/endpoint", + }, + }, + }, + { + name: "Config map cannot override \"Report\" bool attribute", + configFromInf: config.InsightsConfiguration{ + DataReporting: config.DataReporting{ + Enabled: true, + UploadEndpoint: "https://overriden.upload/endpoint", + }, + }, + legacyConfig: config.Controller{ + Report: false, + Endpoint: "http://testing.here", + }, + expectedConfig: &config.InsightsConfiguration{ + DataReporting: config.DataReporting{ + Enabled: false, + UploadEndpoint: "https://overriden.upload/endpoint", + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockSecretConf := config.NewMockSecretConfigurator(&tt.legacyConfig) + mockConfigMapInf := NewMockConfigMapInformer(&tt.configFromInf) + informerAggregator := NewConfigAggregator(mockSecretConf, mockConfigMapInf) + + testConfig := informerAggregator.Config() + assert.Equal(t, tt.expectedConfig, testConfig) + }) + } +} + +type MockConfigMapInformer struct { + factory.Controller + config *config.InsightsConfiguration +} + +func NewMockConfigMapInformer(cfg *config.InsightsConfiguration) *MockConfigMapInformer { + return &MockConfigMapInformer{ + config: cfg, + } +} + +func (m *MockConfigMapInformer) Config() *config.InsightsConfiguration { + return m.config +} + +func (m *MockConfigMapInformer) ConfigChanged() (configCh <-chan struct{}, closeFn func()) { + return nil, nil +} diff --git a/pkg/config/configobserver/configmapobserver.go b/pkg/config/configobserver/configmapobserver.go new file mode 100644 index 000000000..14ceeccfc --- /dev/null +++ b/pkg/config/configobserver/configmapobserver.go @@ -0,0 +1,133 @@ +package configobserver + +import ( + "context" + "sync" + "time" + + "github.com/openshift/insights-operator/pkg/config" + "github.com/openshift/library-go/pkg/controller/factory" + "github.com/openshift/library-go/pkg/operator/events" + "github.com/openshift/library-go/pkg/operator/v1helpers" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/yaml" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" +) + +const ( + insightsConfigMapName = "insights-config" +) + +type ConfigMapInformer interface { + factory.Controller + // Config provides actual Insights configuration values from the "insights-config" configmap + Config() *config.InsightsConfiguration + // ConfigChanged notifies all the listeners that the content of the "insights-config" configmap has changed + ConfigChanged() (<-chan struct{}, func()) +} + +// ConfigMapObserver is a controller for "insights-config" config map +// in the "openshift-insights" namespace. +type ConfigMapObserver struct { + factory.Controller + lock sync.Mutex + kubeCli *kubernetes.Clientset + insightsConfig *config.InsightsConfiguration + listeners map[chan struct{}]struct{} +} + +func NewConfigMapObserver(kubeConfig *rest.Config, + eventRecorder events.Recorder, + kubeInformer v1helpers.KubeInformersForNamespaces) (ConfigMapInformer, error) { + cmInformer := kubeInformer.InformersFor(insightsNamespaceName).Core().V1().ConfigMaps().Informer() + kubeClient, err := kubernetes.NewForConfig(kubeConfig) + if err != nil { + return nil, err + } + + ctrl := &ConfigMapObserver{ + kubeCli: kubeClient, + insightsConfig: nil, + listeners: make(map[chan struct{}]struct{}), + } + factoryCtrl := factory.New().WithInformers(cmInformer). + WithSync(ctrl.sync). + ResyncEvery(10*time.Minute). + ToController("ConfigController", eventRecorder) + + ctrl.Controller = factoryCtrl + return ctrl, nil +} + +// sync is called by the informer with every config map update +func (c *ConfigMapObserver) sync(ctx context.Context, _ factory.SyncContext) error { + cm, err := getConfigMap(ctx, c.kubeCli) + if err != nil { + if !errors.IsNotFound(err) { + return err + } + // config map doesn't exist so clear the config, notify and return + klog.Info(err) + c.insightsConfig = nil + c.notifyListeners() + return nil + } + insightsConfig, err := readConfigAndDecode(cm) + if err != nil { + return err + } + // do not notify listeners on resync + if c.insightsConfig != insightsConfig { + c.insightsConfig = insightsConfig + c.notifyListeners() + } + return nil +} + +func (c *ConfigMapObserver) notifyListeners() { + for ch := range c.listeners { + if ch == nil { + continue + } + ch <- struct{}{} + } +} + +func (c *ConfigMapObserver) Config() *config.InsightsConfiguration { + c.lock.Lock() + defer c.lock.Unlock() + return c.insightsConfig +} + +func (c *ConfigMapObserver) ConfigChanged() (configCh <-chan struct{}, closeFn func()) { + c.lock.Lock() + defer c.lock.Unlock() + ch := make(chan struct{}) + c.listeners[ch] = struct{}{} + return ch, func() { + c.lock.Lock() + defer c.lock.Unlock() + close(ch) + delete(c.listeners, ch) + } +} + +// readConfigAndDecode gets the "insights-config" config map and tries to decode its content. It returns +// "config.InsightsConfiguration" when successfully decoded, otherwise an error. +func readConfigAndDecode(cm *v1.ConfigMap) (*config.InsightsConfiguration, error) { + insightsConfig := &config.InsightsConfigurationSerialized{} + cfg := cm.Data["config.yaml"] + err := yaml.Unmarshal([]byte(cfg), insightsConfig) + if err != nil { + return nil, err + } + return insightsConfig.ToConfig(), nil +} + +func getConfigMap(ctx context.Context, kubeCli *kubernetes.Clientset) (*v1.ConfigMap, error) { + return kubeCli.CoreV1().ConfigMaps(insightsNamespaceName).Get(ctx, insightsConfigMapName, metav1.GetOptions{}) +} diff --git a/pkg/config/mock_configurator.go b/pkg/config/mock_configurator.go index 9bbb7f7c3..8ea9b3125 100644 --- a/pkg/config/mock_configurator.go +++ b/pkg/config/mock_configurator.go @@ -1,6 +1,8 @@ package config import ( + "context" + "github.com/openshift/api/config/v1alpha1" "github.com/openshift/insights-operator/pkg/utils" "github.com/openshift/library-go/pkg/controller/factory" @@ -55,3 +57,34 @@ func (mc *MockAPIConfigurator) GatherDisabled() bool { } return false } + +func (mc *MockAPIConfigurator) GatherDataPolicy() *v1alpha1.DataPolicy { + if mc.config != nil { + return &mc.config.DataPolicy + } + return nil +} + +type MockConfigMapConfigurator struct { + factory.Controller + insightsConfig *InsightsConfiguration +} + +func NewMockConfigMapConfigurator(config *InsightsConfiguration) *MockConfigMapConfigurator { + return &MockConfigMapConfigurator{ + insightsConfig: config, + } +} + +func (m *MockConfigMapConfigurator) Config() *InsightsConfiguration { + return m.insightsConfig +} + +func (m *MockConfigMapConfigurator) ConfigChanged() (configCh <-chan struct{}, closeFn func()) { + // noop + return nil, func() {} +} + +func (m *MockConfigMapConfigurator) Listen(context.Context) { + +} diff --git a/pkg/config/types.go b/pkg/config/types.go new file mode 100644 index 000000000..4166b9885 --- /dev/null +++ b/pkg/config/types.go @@ -0,0 +1,90 @@ +package config + +import ( + "fmt" + "time" + + "k8s.io/klog/v2" +) + +const defaultGatherPeriod = 2 * time.Hour + +// InsightsConfigurationSerialized is a type representing Insights +// Operator configuration values in JSON/YAML and it is when decoding +// the content of the "insights-config" config map. +type InsightsConfigurationSerialized struct { + DataReporting DataReportingSerialized `json:"dataReporting"` +} + +type DataReportingSerialized struct { + Interval string `json:"interval,omitempty"` + UploadEndpoint string `json:"uploadEndpoint,omitempty"` + DownloadEndpoint string `json:"downloadEndpoint,omitempty"` + DownloadEndpointTechPreview string `json:"downloadEndpointTechPreview,omitempty"` + StoragePath string `json:"storagePath,omitempty"` + ConditionalGathererEndpoint string `json:"conditionalGathererEndpoint,omitempty"` + ProcessingStatusEndpoint string `json:"processingStatusEndpoint"` +} + +// InsightsConfiguration is a type representing actual Insights +// Operator configuration options and is used in the code base +// to make the configuration available. +type InsightsConfiguration struct { + DataReporting DataReporting +} + +// DataReporting is a type including all +// the configuration options related to Insights data gathering, +// upload of the data and download of the corresponding Insights analysis report. +type DataReporting struct { + Enabled bool + Interval time.Duration + UploadEndpoint string + DownloadEndpoint string + DownloadEndpointTechPreview string + StoragePath string + ConditionalGathererEndpoint string + ReportPullingDelay time.Duration + ProcessingStatusEndpoint string +} + +// ToConfig reads and pareses the actual serialized configuration from "InsightsConfigurationSerialized" +// and returns the "InsightsConfiguration". +func (i *InsightsConfigurationSerialized) ToConfig() *InsightsConfiguration { + ic := &InsightsConfiguration{ + DataReporting: DataReporting{ + UploadEndpoint: i.DataReporting.UploadEndpoint, + DownloadEndpoint: i.DataReporting.DownloadEndpoint, + DownloadEndpointTechPreview: i.DataReporting.DownloadEndpointTechPreview, + StoragePath: i.DataReporting.StoragePath, + ConditionalGathererEndpoint: i.DataReporting.ConditionalGathererEndpoint, + ProcessingStatusEndpoint: i.DataReporting.ProcessingStatusEndpoint, + }, + } + if i.DataReporting.Interval != "" { + interval, err := time.ParseDuration(i.DataReporting.Interval) + if err != nil { + klog.Errorf("Cannot parse interval time duration: %v. Using default value %s", err, defaultGatherPeriod) + } + if interval <= 0 { + interval = defaultGatherPeriod + } + ic.DataReporting.Interval = interval + } + return ic +} + +func (i *InsightsConfiguration) String() string { + s := fmt.Sprintf(`interval=%s, + upload_endpoint=%s, + storage_path=%s, + download_endpoint=%s, + conditional_gatherer_endpoint=%s`, + i.DataReporting.Interval, + i.DataReporting.UploadEndpoint, + i.DataReporting.StoragePath, + i.DataReporting.DownloadEndpoint, + i.DataReporting.ConditionalGathererEndpoint, + ) + return s +} diff --git a/pkg/controller/gather_commands.go b/pkg/controller/gather_commands.go index 5c9440299..4f3b0c0d7 100644 --- a/pkg/controller/gather_commands.go +++ b/pkg/controller/gather_commands.go @@ -69,6 +69,7 @@ func (g *GatherJob) Gather(ctx context.Context, kubeConfig, protoKubeConfig *res // configobserver synthesizes all config into the status reporter controller configObserver := configobserver.New(g.Controller, kubeClient) + configAggregator := configobserver.NewStaticConfigAggregator(configObserver, kubeClient) // anonymizer is responsible for anonymizing sensitive data, it can be configured to disable specific anonymization anonymizer, err := anonymization.NewAnonymizerFromConfig( @@ -99,7 +100,7 @@ func (g *GatherJob) Gather(ctx context.Context, kubeConfig, protoKubeConfig *res insightsClient := insightsclient.New(nil, 0, "default", authorizer, gatherConfigClient) createdGatherers := gather.CreateAllGatherers( gatherKubeConfig, gatherProtoKubeConfig, metricsGatherKubeConfig, alertsGatherKubeConfig, anonymizer, - configObserver, insightsClient, + configAggregator, insightsClient, ) allFunctionReports := make(map[string]gather.GathererFunctionReport) @@ -155,8 +156,10 @@ func (g *GatherJob) GatherAndUpload(kubeConfig, protoKubeConfig *rest.Config) er if err != nil { return err } + // configobserver synthesizes all config into the status reporter controller configObserver := configobserver.New(g.Controller, kubeClient) + configAggregator := configobserver.NewStaticConfigAggregator(configObserver, kubeClient) // anonymizer is responsible for anonymizing sensitive data, it can be configured to disable specific anonymization anonymizer, err := anonymization.NewAnonymizerFromConfig( ctx, gatherKubeConfig, gatherProtoKubeConfig, protoKubeConfig, configObserver, dataGatherCR.Spec.DataPolicy) @@ -177,9 +180,9 @@ func (g *GatherJob) GatherAndUpload(kubeConfig, protoKubeConfig *rest.Config) er createdGatherers := gather.CreateAllGatherers( gatherKubeConfig, gatherProtoKubeConfig, metricsGatherKubeConfig, alertsGatherKubeConfig, anonymizer, - configObserver, insightsHTTPCli, + configAggregator, insightsHTTPCli, ) - uploader := insightsuploader.New(nil, insightsHTTPCli, configObserver, nil, nil, 0) + uploader := insightsuploader.New(nil, insightsHTTPCli, configAggregator, nil, nil, 0) dataGatherCR, err = status.UpdateDataGatherState(ctx, insightsV1alphaCli, dataGatherCR, insightsv1alpha1.Running) if err != nil { @@ -227,7 +230,7 @@ func (g *GatherJob) GatherAndUpload(kubeConfig, protoKubeConfig *rest.Config) er } // check if the archive/data was processed - processed, err := wasDataProcessed(ctx, insightsHTTPCli, insightsRequestID, configObserver.Config()) + processed, err := wasDataProcessed(ctx, insightsHTTPCli, insightsRequestID, configAggregator.Config()) dataProcessedCon := status.DataProcessedCondition(metav1.ConditionTrue, "Processed", "") if err != nil || !processed { msg := fmt.Sprintf("Data was not processed in the console.redhat.com pipeline for the request %s", insightsRequestID) @@ -317,15 +320,15 @@ type dataStatus struct { // "insightsRequestID" and tries to parse the response body in case of HTTP 200 response. func wasDataProcessed(ctx context.Context, insightsCli processingStatusClient, - insightsRequestID string, controllerConf *config.Controller) (bool, error) { - delay := controllerConf.ReportPullingDelay + insightsRequestID string, conf *config.InsightsConfiguration) (bool, error) { + delay := conf.DataReporting.ReportPullingDelay retryCounter := 0 klog.V(4).Infof("Initial delay when checking processing status: %v", delay) var resp *http.Response err := wait.PollUntilContextCancel(ctx, delay, false, func(ctx context.Context) (done bool, err error) { resp, err = insightsCli.GetWithPathParams(ctx, // nolint: bodyclose - controllerConf.ProcessingStatusEndpoint, insightsRequestID) // response body is closed later + conf.DataReporting.ProcessingStatusEndpoint, insightsRequestID) // response body is closed later if err != nil { return false, err } diff --git a/pkg/controller/gather_commands_test.go b/pkg/controller/gather_commands_test.go index 9990b0e0f..27c70e2ad 100644 --- a/pkg/controller/gather_commands_test.go +++ b/pkg/controller/gather_commands_test.go @@ -82,10 +82,12 @@ func TestWasDataProcessed(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - mockConfCtrl := &config.Controller{ - ReportPullingDelay: 10 * time.Millisecond, + mockConfig := &config.InsightsConfiguration{ + DataReporting: config.DataReporting{ + ReportPullingDelay: 10 * time.Millisecond, + }, } - processed, err := wasDataProcessed(context.Background(), &tt.mockClient, "empty", mockConfCtrl) + processed, err := wasDataProcessed(context.Background(), &tt.mockClient, "empty", mockConfig) assert.Equal(t, tt.expectedErr, err) assert.Equal(t, tt.expectedProcessed, processed) }) diff --git a/pkg/controller/operator.go b/pkg/controller/operator.go index 53dbd8657..77f7cf7c7 100644 --- a/pkg/controller/operator.go +++ b/pkg/controller/operator.go @@ -14,6 +14,7 @@ import ( operatorv1client "github.com/openshift/client-go/operator/clientset/versioned/typed/operator/v1" "github.com/openshift/library-go/pkg/controller/controllercmd" "github.com/openshift/library-go/pkg/operator/configobserver/featuregates" + "github.com/openshift/library-go/pkg/operator/v1helpers" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -150,10 +151,21 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller go insightsInformersfactory.Start(ctx.Done()) } + kubeInf := v1helpers.NewKubeInformersForNamespaces(kubeClient, "openshift-insights") + configMapObserver, err := configobserver.NewConfigMapObserver(gatherKubeConfig, controller.EventRecorder, kubeInf) + if err != nil { + return err + } + go kubeInf.Start(ctx.Done()) + go configMapObserver.Run(ctx, 1) + // secretConfigObserver synthesizes all config into the status reporter controller secretConfigObserver := configobserver.New(s.Controller, kubeClient) go secretConfigObserver.Start(ctx) + configAggregator := configobserver.NewConfigAggregator(secretConfigObserver, configMapObserver) + go configAggregator.Listen(ctx) + // the status controller initializes the cluster operator object and retrieves // the last sync time, if any was set statusReporter := status.NewController(configClient.ConfigV1(), secretConfigObserver, @@ -197,15 +209,15 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller // and provide the results for the recorder gatherers := gather.CreateAllGatherers( gatherKubeConfig, gatherProtoKubeConfig, metricsGatherKubeConfig, alertsGatherKubeConfig, anonymizer, - secretConfigObserver, insightsClient, + configAggregator, insightsClient, ) if !insightsConfigAPIEnabled { - periodicGather = periodic.New(secretConfigObserver, rec, gatherers, anonymizer, + periodicGather = periodic.New(configAggregator, rec, gatherers, anonymizer, operatorClient.InsightsOperators(), kubeClient) statusReporter.AddSources(periodicGather.Sources()...) } else { reportRetriever := insightsreport.NewWithTechPreview(insightsClient, secretConfigObserver) - periodicGather = periodic.NewWithTechPreview(reportRetriever, secretConfigObserver, + periodicGather = periodic.NewWithTechPreview(reportRetriever, configAggregator, insightsDataGatherObserver, gatherers, kubeClient, insightClient.InsightsV1alpha1(), operatorClient.InsightsOperators(), dgInformer) statusReporter.AddSources(periodicGather.Sources()...) statusReporter.AddSources(reportRetriever) @@ -226,7 +238,7 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller if !insightsConfigAPIEnabled { // upload results to the provided client - if no client is configured reporting // is permanently disabled, but if a client does exist the server may still disable reporting - uploader := insightsuploader.New(recdriver, insightsClient, secretConfigObserver, + uploader := insightsuploader.New(recdriver, insightsClient, configAggregator, insightsDataGatherObserver, statusReporter, initialDelay) statusReporter.AddSources(uploader) diff --git a/pkg/controller/periodic/periodic.go b/pkg/controller/periodic/periodic.go index e7e236f15..5b9fb14ec 100644 --- a/pkg/controller/periodic/periodic.go +++ b/pkg/controller/periodic/periodic.go @@ -44,8 +44,8 @@ var ( // Controller periodically runs gatherers, records their results to the recorder // and flushes the recorder to create archives type Controller struct { - secretConfigurator configobserver.Configurator apiConfigurator configobserver.InsightsDataGatherObserver + configAggregator configobserver.Interface recorder recorder.FlushInterface gatherers []gatherers.Interface statuses map[string]controllerstatus.StatusController @@ -63,7 +63,7 @@ type Controller struct { func NewWithTechPreview( reportRetriever *insightsreport.Controller, - secretConfigurator configobserver.Configurator, + configAggregator configobserver.Interface, apiConfigurator configobserver.InsightsDataGatherObserver, listGatherers []gatherers.Interface, kubeClient kubernetes.Interface, @@ -78,7 +78,7 @@ func NewWithTechPreview( jobController := NewJobController(kubeClient) return &Controller{ reportRetriever: reportRetriever, - secretConfigurator: secretConfigurator, + configAggregator: configAggregator, apiConfigurator: apiConfigurator, gatherers: listGatherers, statuses: statuses, @@ -95,7 +95,7 @@ func NewWithTechPreview( // New creates a new instance of Controller which periodically invokes the gatherers // and flushes the recorder to create archives. func New( - secretConfigurator configobserver.Configurator, + configAggregator configobserver.Interface, rec recorder.FlushInterface, listGatherers []gatherers.Interface, anonymizer *anonymization.Anonymizer, @@ -110,7 +110,7 @@ func New( } return &Controller{ - secretConfigurator: secretConfigurator, + configAggregator: configAggregator, recorder: rec, gatherers: listGatherers, statuses: statuses, @@ -189,8 +189,8 @@ func (c *Controller) Gather() { gatherersToProcess = append(gatherersToProcess, gatherer) } } - - ctx, cancel := context.WithTimeout(context.Background(), c.secretConfigurator.Config().Interval) + interval := c.configAggregator.Config().DataReporting.Interval + ctx, cancel := context.WithTimeout(context.Background(), interval) defer cancel() allFunctionReports := make(map[string]gather.GathererFunctionReport) @@ -232,30 +232,30 @@ func (c *Controller) Gather() { // Periodically starts the gathering. // If there is an initialDelay set then it waits that much for the first gather to happen. func (c *Controller) periodicTrigger(stopCh <-chan struct{}) { - configCh, closeFn := c.secretConfigurator.ConfigChanged() + configCh, closeFn := c.configAggregator.ConfigChanged() defer closeFn() - interval := c.secretConfigurator.Config().Interval + interval := c.configAggregator.Config().DataReporting.Interval klog.Infof("Gathering cluster info every %s", interval) + klog.Infof("Configuration is %v", c.configAggregator.Config().String()) + t := time.NewTicker(interval) for { select { case <-stopCh: + t.Stop() return - case <-configCh: - newInterval := c.secretConfigurator.Config().Interval + newInterval := c.configAggregator.Config().DataReporting.Interval if newInterval == interval { continue } + interval = newInterval + t.Reset(interval) klog.Infof("Gathering cluster info every %s", interval) - - case <-time.After(interval): - if c.techPreview { - c.GatherJob() - } else { - c.Gather() - } + klog.Infof("Configuration is %v", c.configAggregator.Config().String()) + case <-t.C: + c.Gather() } } } @@ -271,7 +271,7 @@ func (c *Controller) onDemandGather(stopCh <-chan struct{}) { return case dgName := <-c.dgInf.DataGatherCreated(): go func() { - ctx, cancel := context.WithTimeout(context.Background(), c.secretConfigurator.Config().Interval*4) + ctx, cancel := context.WithTimeout(context.Background(), c.configAggregator.Config().DataReporting.Interval*4) defer cancel() state, err := c.dataGatherState(ctx, dgName) @@ -300,7 +300,7 @@ func (c *Controller) GatherJob() { klog.V(3).Info("Gather is disabled by configuration.") return } - ctx, cancel := context.WithTimeout(context.Background(), c.secretConfigurator.Config().Interval*4) + ctx, cancel := context.WithTimeout(context.Background(), c.configAggregator.Config().DataReporting.Interval*4) defer cancel() if c.image == "" { @@ -329,7 +329,7 @@ func (c *Controller) GatherJob() { func (c *Controller) runJobAndCheckResults(ctx context.Context, dataGatherName string) { // create a new periodic gathering job - gj, err := c.jobController.CreateGathererJob(ctx, dataGatherName, c.image, c.secretConfigurator.Config().StoragePath) + gj, err := c.jobController.CreateGathererJob(ctx, dataGatherName, c.image, c.configAggregator.Config().DataReporting.StoragePath) if err != nil { klog.Errorf("Failed to create a new job: %v", err) return @@ -401,7 +401,7 @@ func (c *Controller) updateInsightsReportInDataGather(ctx context.Context, dg.Status.InsightsReport.HealthChecks = append(dg.Status.InsightsReport.HealthChecks, healthCheck) } dg.Status.InsightsReport.DownloadedAt = report.DownloadedAt - uri := fmt.Sprintf(c.secretConfigurator.Config().ReportEndpointTechPreview, report.ClusterID, report.RequestID) + uri := fmt.Sprintf(c.configAggregator.Config().DataReporting.DownloadEndpointTechPreview, report.ClusterID, report.RequestID) dg.Status.InsightsReport.URI = uri _, err := c.dataGatherClient.DataGathers().UpdateStatus(ctx, dg, metav1.UpdateOptions{}) return err @@ -468,7 +468,7 @@ func (c *Controller) updateOperatorStatusCR(ctx context.Context, allFunctionRepo func (c *Controller) isGatheringDisabled() bool { // old way of disabling data gathering by removing // the "cloud.openshift.com" token from the pull-secret - if !c.secretConfigurator.Config().Report { + if !c.configAggregator.Config().DataReporting.Enabled { return true } diff --git a/pkg/controller/periodic/periodic_test.go b/pkg/controller/periodic/periodic_test.go index a801adaeb..2c9e7b943 100644 --- a/pkg/controller/periodic/periodic_test.go +++ b/pkg/controller/periodic/periodic_test.go @@ -101,9 +101,9 @@ func Test_Controller_periodicTrigger(t *testing.T) { }{ { name: "periodicTrigger finished gathering", - interval: 1 * time.Second, + interval: 2 * time.Second, waitTime: 3 * time.Second, - expectedNumOfRecords: 12, + expectedNumOfRecords: 6, }, { name: "periodicTrigger stopped with no data gathered", @@ -211,13 +211,19 @@ func getMocksForPeriodicTest(listGatherers []gatherers.Interface, interval time. Report: true, Interval: interval, }} + mockConfigMapConfigurator := config.NewMockConfigMapConfigurator(&config.InsightsConfiguration{ + DataReporting: config.DataReporting{ + Enabled: true, + Interval: interval, + }, + }) mockRecorder := recorder.MockRecorder{} mockAnonymizer, err := anonymization.NewAnonymizer("", []string{}, nil, &mockConfigurator, "") if err != nil { return nil, nil, err } fakeInsightsOperatorCli := fakeOperatorCli.NewSimpleClientset().OperatorV1().InsightsOperators() - mockController := New(&mockConfigurator, &mockRecorder, listGatherers, mockAnonymizer, fakeInsightsOperatorCli, nil) + mockController := New(mockConfigMapConfigurator, &mockRecorder, listGatherers, mockAnonymizer, fakeInsightsOperatorCli, nil) return mockController, &mockRecorder, nil } @@ -1075,12 +1081,13 @@ func TestUpdateInsightsReportInDataGather(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { insightsCs := insightsFakeCli.NewSimpleClientset(tt.dataGatherToUpdate) - mockSecretConf := &config.MockSecretConfigurator{ - Conf: &config.Controller{ - ReportEndpointTechPreview: "https://test.report.endpoint.tech.preview.uri/cluster/%s/requestID/%s", + conf := &config.InsightsConfiguration{ + DataReporting: config.DataReporting{ + DownloadEndpointTechPreview: "https://test.report.endpoint.tech.preview.uri/cluster/%s/requestID/%s", }, } - mockController := NewWithTechPreview(nil, mockSecretConf, nil, nil, nil, insightsCs.InsightsV1alpha1(), nil, nil) + mockCMConf := config.NewMockConfigMapConfigurator(conf) + mockController := NewWithTechPreview(nil, mockCMConf, nil, nil, nil, insightsCs.InsightsV1alpha1(), nil, nil) err := mockController.updateInsightsReportInDataGather(context.Background(), tt.analysisReport, tt.dataGatherToUpdate) assert.NoError(t, err) assert.Equal(t, tt.expectedInsightsReport, &tt.dataGatherToUpdate.Status.InsightsReport) diff --git a/pkg/gather/gather.go b/pkg/gather/gather.go index 2cd5e5eee..9116bbfcc 100644 --- a/pkg/gather/gather.go +++ b/pkg/gather/gather.go @@ -61,7 +61,7 @@ type ArchiveMetadata struct { // CreateAllGatherers creates all the gatherers func CreateAllGatherers( gatherKubeConfig, gatherProtoKubeConfig, metricsGatherKubeConfig, alertsGatherKubeConfig *rest.Config, - anonymizer *anonymization.Anonymizer, configObserver *configobserver.Controller, + anonymizer *anonymization.Anonymizer, configObserver configobserver.Interface, insightsClient *insightsclient.Client, ) []gatherers.Interface { clusterConfigGatherer := clusterconfig.New( diff --git a/pkg/gatherers/clusterconfig/clusterconfig_gatherer.go b/pkg/gatherers/clusterconfig/clusterconfig_gatherer.go index c5d625f5e..3adc7845c 100644 --- a/pkg/gatherers/clusterconfig/clusterconfig_gatherer.go +++ b/pkg/gatherers/clusterconfig/clusterconfig_gatherer.go @@ -20,7 +20,6 @@ type Gatherer struct { alertsGatherKubeConfig *rest.Config anonymizer *anonymization.Anonymizer interval time.Duration - configObserver *configobserver.Controller } // gathererFuncPtr is a type for pointers to functions of Gatherer @@ -92,11 +91,11 @@ var gatheringFunctions = map[string]gathererFuncPtr{ func New( gatherKubeConfig, gatherProtoKubeConfig, metricsGatherKubeConfig, alertsGatherKubeConfig *rest.Config, - anonymizer *anonymization.Anonymizer, configObserver *configobserver.Controller, + anonymizer *anonymization.Anonymizer, configObserver configobserver.Interface, ) *Gatherer { interval := time.Minute if configObserver != nil && configObserver.Config() != nil { - interval = configObserver.Config().Interval + interval = configObserver.Config().DataReporting.Interval } return &Gatherer{ @@ -106,7 +105,6 @@ func New( alertsGatherKubeConfig: alertsGatherKubeConfig, anonymizer: anonymizer, interval: interval, - configObserver: configObserver, } } diff --git a/pkg/gatherers/clusterconfig/gather_support_secret.go b/pkg/gatherers/clusterconfig/gather_support_secret.go index 231bf070f..a7489e613 100644 --- a/pkg/gatherers/clusterconfig/gather_support_secret.go +++ b/pkg/gatherers/clusterconfig/gather_support_secret.go @@ -2,10 +2,12 @@ package clusterconfig import ( "context" - "fmt" "github.com/openshift/insights-operator/pkg/record" "github.com/openshift/insights-operator/pkg/utils/anonymize" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + v1 "k8s.io/client-go/kubernetes/typed/core/v1" ) // GatherSupportSecret Collects anonymized support secret if there is any @@ -30,19 +32,25 @@ import ( // // ### Changes // None -func (g *Gatherer) GatherSupportSecret(context.Context) ([]record.Record, []error) { - if g.configObserver == nil { - return nil, []error{fmt.Errorf("configObserver is nil")} +func (g *Gatherer) GatherSupportSecret(ctx context.Context) ([]record.Record, []error) { + gatherKubeClient, err := kubernetes.NewForConfig(g.gatherKubeConfig) + if err != nil { + return nil, []error{err} } - if supportSecret := g.configObserver.SupportSecret(); supportSecret != nil && supportSecret.Data != nil { - return []record.Record{{ - Name: "config/secrets/openshift-config/support/data", - Item: record.JSONMarshaller{Object: anonymizeSecretData(supportSecret.Data)}, - }}, nil + return gatherSupportSecret(ctx, gatherKubeClient.CoreV1()) +} + +func gatherSupportSecret(ctx context.Context, cli v1.CoreV1Interface) ([]record.Record, []error) { + supportSecret, err := cli.Secrets("openshift-config").Get(ctx, "support", metav1.GetOptions{}) + if err != nil { + return nil, []error{err} } - return nil, nil + return []record.Record{{ + Name: "config/secrets/openshift-config/support/data", + Item: record.JSONMarshaller{Object: anonymizeSecretData(supportSecret.Data)}, + }}, nil } func anonymizeSecretData(data map[string][]byte) map[string][]byte { diff --git a/pkg/gatherers/clusterconfig/gather_support_secret_test.go b/pkg/gatherers/clusterconfig/gather_support_secret_test.go index e3676b3da..167dcc6cc 100644 --- a/pkg/gatherers/clusterconfig/gather_support_secret_test.go +++ b/pkg/gatherers/clusterconfig/gather_support_secret_test.go @@ -9,8 +9,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubefake "k8s.io/client-go/kubernetes/fake" - "github.com/openshift/insights-operator/pkg/config" - "github.com/openshift/insights-operator/pkg/config/configobserver" "github.com/openshift/insights-operator/pkg/record" ) @@ -26,12 +24,8 @@ func Test_gatherSupportSecret(t *testing.T) { }, }, metav1.CreateOptions{}) assert.NoError(t, err) - configObserver := configobserver.New(config.Controller{}, kubeClient) - gatherer := New( - nil, nil, nil, nil, nil, configObserver, - ) - records, errs := gatherer.GatherSupportSecret(context.TODO()) + records, errs := gatherSupportSecret(context.Background(), kubeClient.CoreV1()) assert.Empty(t, errs) assert.Len(t, records, 1) assert.Equal(t, record.Record{ diff --git a/pkg/gatherers/conditional/conditional_gatherer.go b/pkg/gatherers/conditional/conditional_gatherer.go index 2b9eabd03..90fc77ea4 100644 --- a/pkg/gatherers/conditional/conditional_gatherer.go +++ b/pkg/gatherers/conditional/conditional_gatherer.go @@ -36,7 +36,7 @@ type Gatherer struct { firingAlerts map[string][]AlertLabels gatheringRules GatheringRules clusterVersion string - configurator configobserver.Configurator + configurator configobserver.Interface gatheringRulesServiceClient GatheringRulesServiceClient } @@ -47,7 +47,7 @@ type GatheringRulesServiceClient interface { // New creates a new instance of conditional gatherer with the appropriate configs func New( gatherProtoKubeConfig, metricsGatherKubeConfig, gatherKubeConfig *rest.Config, - configurator configobserver.Configurator, gatheringRulesServiceClient GatheringRulesServiceClient, + configurator configobserver.Interface, gatheringRulesServiceClient GatheringRulesServiceClient, ) *Gatherer { var imageKubeConfig *rest.Config if gatherProtoKubeConfig != nil { @@ -212,7 +212,7 @@ func (g *Gatherer) getRulesEndpoint() (string, error) { return "", fmt.Errorf("config is nil") } - return config.ConditionalGathererEndpoint, nil + return config.DataReporting.ConditionalGathererEndpoint, nil } // updateCache updates alerts and version caches diff --git a/pkg/gatherers/conditional/conditional_gatherer_test.go b/pkg/gatherers/conditional/conditional_gatherer_test.go index 23e12b95b..e70b0e4b8 100644 --- a/pkg/gatherers/conditional/conditional_gatherer_test.go +++ b/pkg/gatherers/conditional/conditional_gatherer_test.go @@ -294,12 +294,18 @@ func newEmptyGatherer(gathererConfig string) *Gatherer { // nolint:gocritic }] }` } + testConf := &config.InsightsConfiguration{ + DataReporting: config.DataReporting{ + ConditionalGathererEndpoint: "/gathering_rules", + }, + } + mockConfigurator := config.NewMockConfigMapConfigurator(testConf) return New( nil, nil, nil, - &config.MockSecretConfigurator{Conf: &config.Controller{ConditionalGathererEndpoint: "/gathering_rules"}}, + mockConfigurator, &MockGatheringRulesServiceClient{Conf: gathererConfig}, ) } diff --git a/pkg/insights/insightsuploader/insightsuploader.go b/pkg/insights/insightsuploader/insightsuploader.go index f260a74a0..2636a1028 100644 --- a/pkg/insights/insightsuploader/insightsuploader.go +++ b/pkg/insights/insightsuploader/insightsuploader.go @@ -33,35 +33,35 @@ type StatusReporter interface { type Controller struct { controllerstatus.StatusController - summarizer Summarizer - client *insightsclient.Client - secretConfigurator configobserver.Configurator - apiConfigurator configobserver.InsightsDataGatherObserver - reporter StatusReporter - archiveUploaded chan struct{} - initialDelay time.Duration - backoff wait.Backoff + summarizer Summarizer + client *insightsclient.Client + configurator configobserver.Interface + apiConfigurator configobserver.InsightsDataGatherObserver + reporter StatusReporter + archiveUploaded chan struct{} + initialDelay time.Duration + backoff wait.Backoff } func New(summarizer Summarizer, client *insightsclient.Client, - secretconfigurator configobserver.Configurator, + configurator configobserver.Interface, apiConfigurator configobserver.InsightsDataGatherObserver, statusReporter StatusReporter, initialDelay time.Duration) *Controller { ctrl := &Controller{ - StatusController: controllerstatus.New("insightsuploader"), - summarizer: summarizer, - secretConfigurator: secretconfigurator, - apiConfigurator: apiConfigurator, - client: client, - reporter: statusReporter, - archiveUploaded: make(chan struct{}), - initialDelay: initialDelay, + StatusController: controllerstatus.New("insightsuploader"), + summarizer: summarizer, + configurator: configurator, + apiConfigurator: apiConfigurator, + client: client, + reporter: statusReporter, + archiveUploaded: make(chan struct{}), + initialDelay: initialDelay, } ctrl.backoff = wait.Backoff{ - Duration: ctrl.secretConfigurator.Config().Interval / 4, // 30 min as first wait by default + Duration: ctrl.configurator.Config().DataReporting.Interval / 4, // 30 min as first wait by default Steps: 4, Factor: 2, } @@ -77,13 +77,9 @@ func (c *Controller) Run(ctx context.Context) { } // the controller periodically uploads results to the remote insights endpoint - cfg := c.secretConfigurator.Config() - configCh, cancelFn := c.secretConfigurator.ConfigChanged() - defer cancelFn() + cfg := c.configurator.Config() - reportingEnabled := cfg.Report - endpoint := cfg.Endpoint - interval := cfg.Interval + interval := cfg.DataReporting.Interval lastReported := c.reporter.LastReportedTime() if !lastReported.IsZero() { next := lastReported.Add(interval) @@ -91,90 +87,122 @@ func (c *Controller) Run(ctx context.Context) { c.initialDelay = wait.Jitter(now.Sub(next), 1.2) } } - klog.V(2).Infof("Reporting status periodically to %s every %s, starting in %s", cfg.Endpoint, interval, c.initialDelay.Truncate(time.Second)) - - wait.Until(func() { - if c.initialDelay > 0 { - select { - case <-ctx.Done(): - case <-time.After(c.initialDelay): - case <-configCh: - newCfg := c.secretConfigurator.Config() - interval = newCfg.Interval - endpoint = newCfg.Endpoint - reportingEnabled = newCfg.Report - var disabledInAPI bool - if c.apiConfigurator != nil { - disabledInAPI = c.apiConfigurator.GatherDisabled() - } - if !reportingEnabled || disabledInAPI { - klog.V(2).Infof("Reporting was disabled") - c.initialDelay = newCfg.Interval - return - } - } - c.initialDelay = 0 - } + klog.V(2).Infof("Reporting status periodically to %s every %s, starting in %s", cfg.DataReporting.UploadEndpoint, interval, c.initialDelay.Truncate(time.Second)) + go wait.Until(func() { c.periodicTrigger(ctx.Done()) }, 5*time.Second, ctx.Done()) +} - // attempt to get a summary to send to the server - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() +func (c *Controller) periodicTrigger(stopCh <-chan struct{}) { + klog.Infof("Checking archives to upload periodically every %s", c.initialDelay) + lastReported := c.reporter.LastReportedTime() + cfg := c.configurator.Config() + interval := cfg.DataReporting.Interval + endpoint := cfg.DataReporting.UploadEndpoint + var disabledInAPI bool + if c.apiConfigurator != nil { + disabledInAPI = c.apiConfigurator.GatherDisabled() + } + reportingEnabled := cfg.DataReporting.Enabled && !disabledInAPI - source, ok, err := c.summarizer.Summary(ctx, lastReported) - if err != nil { - c.StatusController.UpdateStatus(controllerstatus.Summary{Reason: "SummaryFailed", Message: fmt.Sprintf("Unable to retrieve local insights data: %v", err)}) - return - } - if !ok { - klog.V(4).Infof("Nothing to report since %s", lastReported.Format(time.RFC3339)) + configCh, cancelFn := c.configurator.ConfigChanged() + defer cancelFn() + + if c.initialDelay == 0 { + c.checkSummaryAndSend(interval, lastReported, endpoint, reportingEnabled) + return + } + ticker := time.NewTicker(c.initialDelay) + for { + select { + case <-stopCh: + ticker.Stop() + case <-ticker.C: + c.checkSummaryAndSend(interval, lastReported, endpoint, reportingEnabled) + ticker.Reset(c.initialDelay) return - } - defer source.Contents.Close() - if reportingEnabled && len(endpoint) > 0 { - // send the results - start := time.Now() - id := start.Format(time.RFC3339) - klog.V(4).Infof("Uploading latest report since %s", lastReported.Format(time.RFC3339)) - source.ID = id - source.Type = "application/vnd.redhat.openshift.periodic" - if err := c.client.Send(ctx, endpoint, *source); err != nil { - klog.V(2).Infof("Unable to upload report after %s: %v", time.Since(start).Truncate(time.Second/100), err) - if err == insightsclient.ErrWaitingForVersion { - c.initialDelay = wait.Jitter(time.Second*15, 1) - return - } - if authorizer.IsAuthorizationError(err) { - c.StatusController.UpdateStatus(controllerstatus.Summary{Operation: controllerstatus.Uploading, - Reason: "NotAuthorized", Message: fmt.Sprintf("Reporting was not allowed: %v", err)}) - c.initialDelay = wait.Jitter(interval/2, 2) - return - } - - c.initialDelay = wait.Jitter(interval/8, 1.2) - c.StatusController.UpdateStatus(controllerstatus.Summary{Operation: controllerstatus.Uploading, - Reason: "UploadFailed", Message: fmt.Sprintf("Unable to report: %v", err)}) - return + case <-configCh: + newCfg := c.configurator.Config() + endpoint = newCfg.DataReporting.UploadEndpoint + reportingEnabled = newCfg.DataReporting.Enabled + var disabledInAPI bool + if c.apiConfigurator != nil { + disabledInAPI = c.apiConfigurator.GatherDisabled() } - klog.V(4).Infof("Uploaded report successfully in %s", time.Since(start)) - select { - case c.archiveUploaded <- struct{}{}: - default: + if !reportingEnabled || disabledInAPI { + klog.V(2).Infof("Reporting was disabled") + c.initialDelay = newCfg.DataReporting.Interval + return } - lastReported = start.UTC() - c.StatusController.UpdateStatus(controllerstatus.Summary{Healthy: true}) - } else { - klog.V(4).Info("Display report that would be sent") - // display what would have been sent (to ensure we always exercise source processing) - if err := reportToLogs(source.Contents, klog.V(4)); err != nil { - klog.Errorf("Unable to log upload: %v", err) + newInterval := newCfg.DataReporting.Interval + if newInterval == interval { + continue } - // we didn't actually report logs, so don't advance the report date + interval = newInterval + // there's no return in this case so set the initial delay again + c.initialDelay = wait.Jitter(interval/8, 0.1) + ticker.Reset(c.initialDelay) } + } +} + +func (c *Controller) checkSummaryAndSend(interval time.Duration, lastReported time.Time, endpoint string, reportingEnabled bool) { + // attempt to get a summary to send to the server + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() - c.reporter.SetLastReportedTime(lastReported) + source, ok, err := c.summarizer.Summary(ctx, lastReported) + if err != nil { + c.StatusController.UpdateStatus(controllerstatus.Summary{Reason: "SummaryFailed", Message: fmt.Sprintf("Unable to retrieve local insights data: %v", err)}) + return + } + if !ok { + klog.V(4).Infof("Nothing to report since %s", lastReported.Format(time.RFC3339)) + return + } + defer source.Contents.Close() + if reportingEnabled && len(endpoint) > 0 { + // send the results + start := time.Now() + id := start.Format(time.RFC3339) + klog.V(4).Infof("Uploading latest report since %s", lastReported.Format(time.RFC3339)) + source.ID = id + source.Type = "application/vnd.redhat.openshift.periodic" + if err := c.client.Send(ctx, endpoint, *source); err != nil { + klog.V(2).Infof("Unable to upload report after %s: %v", time.Since(start).Truncate(time.Second/100), err) + if err == insightsclient.ErrWaitingForVersion { + c.initialDelay = wait.Jitter(time.Second*15, 1) + return + } + if authorizer.IsAuthorizationError(err) { + c.StatusController.UpdateStatus(controllerstatus.Summary{Operation: controllerstatus.Uploading, + Reason: "NotAuthorized", Message: fmt.Sprintf("Reporting was not allowed: %v", err)}) + c.initialDelay = wait.Jitter(interval/2, 2) + + return + } + + c.initialDelay = wait.Jitter(interval/8, 1.2) + c.StatusController.UpdateStatus(controllerstatus.Summary{Operation: controllerstatus.Uploading, + Reason: "UploadFailed", Message: fmt.Sprintf("Unable to report: %v", err)}) + return + } + klog.V(4).Infof("Uploaded report successfully in %s", time.Since(start)) + select { + case c.archiveUploaded <- struct{}{}: + default: + } + lastReported = start.UTC() + c.StatusController.UpdateStatus(controllerstatus.Summary{Healthy: true}) + } else { + klog.V(4).Info("Display report that would be sent") + // display what would have been sent (to ensure we always exercise source processing) + if err := reportToLogs(source.Contents, klog.V(4)); err != nil { + klog.Errorf("Unable to log upload: %v", err) + } + // we didn't actually report logs, so don't advance the report date + } - c.initialDelay = wait.Jitter(interval, 1.2) - }, 15*time.Second, ctx.Done()) + c.reporter.SetLastReportedTime(lastReported) + c.initialDelay = wait.Jitter(interval/8, 0.1) } // ArchiveUploaded returns a channel that indicates when an archive is uploaded @@ -192,7 +220,7 @@ func (c *Controller) Upload(ctx context.Context, s *insightsclient.Source) (stri var requestID string var statusCode int err := wait.ExponentialBackoff(c.backoff, func() (done bool, err error) { - requestID, statusCode, err = c.client.SendAndGetID(ctx, c.secretConfigurator.Config().Endpoint, *s) + requestID, statusCode, err = c.client.SendAndGetID(ctx, c.configurator.Config().DataReporting.UploadEndpoint, *s) if err != nil { // do no return the error if it's not the last attempt if c.backoff.Steps > 1 {