diff --git a/manifests/03-clusterrole.yaml b/manifests/03-clusterrole.yaml index 56c38945a..0e2a71831 100644 --- a/manifests/03-clusterrole.yaml +++ b/manifests/03-clusterrole.yaml @@ -103,15 +103,6 @@ rules: - get - list - delete - - apiGroups: - - "" - resources: - - configmaps - verbs: - - get - - list - - watch - --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding @@ -307,13 +298,6 @@ rules: verbs: - get - list - - apiGroups: - - "" - resources: - - secrets - verbs: - - get - - list --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/pkg/config/configobserver/config_aggregator.go b/pkg/config/configobserver/config_aggregator.go deleted file mode 100644 index 941fa2308..000000000 --- a/pkg/config/configobserver/config_aggregator.go +++ /dev/null @@ -1,205 +0,0 @@ -package configobserver - -import ( - "context" - "sync" - - "github.com/openshift/insights-operator/pkg/config" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/klog/v2" -) - -const insightsNamespaceName = "openshift-insights" - -type Interface interface { - Config() *config.InsightsConfiguration - ConfigChanged() (<-chan struct{}, func()) - Listen(ctx context.Context) -} - -// ConfigAggregator is an auxiliary structure that should obviate the need for the use of -// legacy secret configurator and the new config map informer -type ConfigAggregator struct { - lock sync.Mutex - legacyConfigurator Configurator - configMapInformer ConfigMapInformer - config *config.InsightsConfiguration - listeners map[chan struct{}]struct{} - usingInformer bool - kubeClient kubernetes.Interface -} - -func NewConfigAggregator(ctrl Configurator, configMapInf ConfigMapInformer) Interface { - confAggreg := &ConfigAggregator{ - legacyConfigurator: ctrl, - configMapInformer: configMapInf, - listeners: make(map[chan struct{}]struct{}), - usingInformer: true, - } - confAggreg.mergeUsingInformer() - return confAggreg -} - -// NewStaticConfigAggregator is a constructor used mainly for the techpreview configuration reading. -// There is no reason to create and start any informer in the techpreview when data gathering runs as a job. -// It is sufficient to read the config once when the job is created and/or starting. -func NewStaticConfigAggregator(ctrl Configurator, cli kubernetes.Interface) Interface { - confAggreg := &ConfigAggregator{ - legacyConfigurator: ctrl, - configMapInformer: nil, - kubeClient: cli, - } - - confAggreg.mergeStatically() - return confAggreg -} - -// mergeUsingInformer merges config values for the legacy "support" secret configuration and -// from the new configmap informer. The "insights-config" configmap always takes -// precedence if it exists and is not empty. -func (c *ConfigAggregator) mergeUsingInformer() { - c.lock.Lock() - defer c.lock.Unlock() - newConf := c.configMapInformer.Config() - conf := c.legacyConfigToInsightsConfiguration() - - if newConf == nil { - c.config = conf - return - } - - c.merge(conf, newConf) -} - -// mergeStatically merges config values for the legacy "support" secret configuration and -// from the "insights-config" configmap by getting and reading the confimap directly without -// using an informer. -func (c *ConfigAggregator) mergeStatically() { - c.lock.Lock() - defer c.lock.Unlock() - conf := c.legacyConfigToInsightsConfiguration() - c.config = conf - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - cm, err := c.kubeClient.CoreV1().ConfigMaps(insightsNamespaceName).Get(ctx, insightsConfigMapName, metav1.GetOptions{}) - if err != nil { - if errors.IsNotFound(err) { - return - } - klog.Error(err) - } - - cmConf, err := readConfigAndDecode(cm) - if err != nil { - klog.Error("Failed to read configmap configuration: %v", err) - return - } - - c.merge(conf, cmConf) -} - -func (c *ConfigAggregator) merge(defaultCfg, newCfg *config.InsightsConfiguration) { - // read config map values and merge - if newCfg.DataReporting.Interval != 0 { - defaultCfg.DataReporting.Interval = newCfg.DataReporting.Interval - } - - if newCfg.DataReporting.UploadEndpoint != "" { - defaultCfg.DataReporting.UploadEndpoint = newCfg.DataReporting.UploadEndpoint - } - - if newCfg.DataReporting.DownloadEndpoint != "" { - defaultCfg.DataReporting.DownloadEndpoint = newCfg.DataReporting.DownloadEndpoint - } - - if newCfg.DataReporting.DownloadEndpointTechPreview != "" { - defaultCfg.DataReporting.DownloadEndpointTechPreview = newCfg.DataReporting.DownloadEndpointTechPreview - } - - if newCfg.DataReporting.ProcessingStatusEndpoint != "" { - defaultCfg.DataReporting.ProcessingStatusEndpoint = newCfg.DataReporting.ProcessingStatusEndpoint - } - - if newCfg.DataReporting.ConditionalGathererEndpoint != "" { - defaultCfg.DataReporting.ConditionalGathererEndpoint = newCfg.DataReporting.ConditionalGathererEndpoint - } - - if newCfg.DataReporting.StoragePath != "" { - defaultCfg.DataReporting.StoragePath = newCfg.DataReporting.StoragePath - } - c.config = defaultCfg -} - -func (c *ConfigAggregator) Config() *config.InsightsConfiguration { - if c.usingInformer { - c.mergeUsingInformer() - } else { - c.mergeStatically() - } - return c.config -} - -// Listen listens to the legacy Secret configurator/observer as well as the -// new config map informer. When any configuration change is observed then all the listeners -// are notified. -func (c *ConfigAggregator) Listen(ctx context.Context) { - legacyCh, legacyCloseFn := c.legacyConfigurator.ConfigChanged() - cmCh, cmICloseFn := c.configMapInformer.ConfigChanged() - defer func() { - legacyCloseFn() - cmICloseFn() - }() - - for { - select { - case <-legacyCh: - c.notifyListeners() - case <-cmCh: - c.notifyListeners() - case <-ctx.Done(): - return - } - } -} - -func (c *ConfigAggregator) notifyListeners() { - for ch := range c.listeners { - ch <- struct{}{} - } -} - -func (c *ConfigAggregator) ConfigChanged() (configCh <-chan struct{}, closeFn func()) { - c.lock.Lock() - defer c.lock.Unlock() - ch := make(chan struct{}, 1) - c.listeners[ch] = struct{}{} - return ch, func() { - c.lock.Lock() - defer c.lock.Unlock() - close(ch) - delete(c.listeners, ch) - } -} - -func (c *ConfigAggregator) legacyConfigToInsightsConfiguration() *config.InsightsConfiguration { - legacyConfig := c.legacyConfigurator.Config() - return &config.InsightsConfiguration{ - DataReporting: config.DataReporting{ - Interval: legacyConfig.Interval, - UploadEndpoint: legacyConfig.Endpoint, - DownloadEndpoint: legacyConfig.ReportEndpoint, - ConditionalGathererEndpoint: legacyConfig.ConditionalGathererEndpoint, - ProcessingStatusEndpoint: legacyConfig.ProcessingStatusEndpoint, - DownloadEndpointTechPreview: legacyConfig.ReportEndpointTechPreview, - // This can't be overridden by the config map - it's not merged in the merge function. - // The value is based on the presence of the token in the pull-secret and the config map - // doesn't know anything about secrets - Enabled: legacyConfig.Report, - StoragePath: legacyConfig.StoragePath, - ReportPullingDelay: legacyConfig.ReportPullingDelay, - }, - } -} diff --git a/pkg/config/configobserver/config_aggregator_test.go b/pkg/config/configobserver/config_aggregator_test.go deleted file mode 100644 index 67534efae..000000000 --- a/pkg/config/configobserver/config_aggregator_test.go +++ /dev/null @@ -1,239 +0,0 @@ -package configobserver - -import ( - "testing" - "time" - - "github.com/openshift/insights-operator/pkg/config" - "github.com/openshift/library-go/pkg/controller/factory" - "github.com/stretchr/testify/assert" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - kubefake "k8s.io/client-go/kubernetes/fake" -) - -func TestMergeStatically(t *testing.T) { - tests := []struct { - name string - configCM *v1.ConfigMap - legacyConfig config.Controller - expectedConfig *config.InsightsConfiguration - }{ - { - name: "No config map exists - legacy config is used", - configCM: nil, - legacyConfig: config.Controller{ - Report: true, - StoragePath: "/foo/bar/", - Endpoint: "http://testing.here", - ReportEndpoint: "http://reportendpoint.here", - Interval: 2 * time.Hour, - ConditionalGathererEndpoint: "http://conditionalendpoint.here", - }, - expectedConfig: &config.InsightsConfiguration{ - DataReporting: config.DataReporting{ - Enabled: true, - UploadEndpoint: "http://testing.here", - StoragePath: "/foo/bar/", - DownloadEndpoint: "http://reportendpoint.here", - Interval: 2 * time.Hour, - ConditionalGathererEndpoint: "http://conditionalendpoint.here", - }, - }, - }, - { - name: "Config map exists and overrides legacy config", - configCM: &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: insightsConfigMapName, - Namespace: "openshift-insights", - }, - Data: map[string]string{ - "config.yaml": ` -dataReporting: - interval: 1h - uploadEndpoint: https://overriden.upload/endpoint - storagePath: /var/lib/test - downloadEndpoint: https://overriden.download/endpoint - conditionalGathererEndpoint: https://overriden.conditional/endpoint - processingStatusEndpoint: https://overriden.status/endpoint - downloadEndpointTechPreview: https://overriden.downloadtechpreview/endpoint`, - }, - }, - legacyConfig: config.Controller{ - Report: true, - StoragePath: "/foo/bar/", - Endpoint: "http://testing.here", - ReportEndpoint: "http://reportendpoint.here", - Interval: 2 * time.Hour, - ConditionalGathererEndpoint: "http://conditionalendpoint.here", - ProcessingStatusEndpoint: "http://statusendpoint.here", - ReportEndpointTechPreview: "http://downloadtpendpoint.here", - }, - expectedConfig: &config.InsightsConfiguration{ - DataReporting: config.DataReporting{ - Enabled: true, - UploadEndpoint: "https://overriden.upload/endpoint", - StoragePath: "/var/lib/test", - DownloadEndpoint: "https://overriden.download/endpoint", - Interval: 1 * time.Hour, - ConditionalGathererEndpoint: "https://overriden.conditional/endpoint", - ProcessingStatusEndpoint: "https://overriden.status/endpoint", - DownloadEndpointTechPreview: "https://overriden.downloadtechpreview/endpoint", - }, - }, - }, - { - name: "Config map cannot override \"Report\" bool attribute", - configCM: &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: insightsConfigMapName, - Namespace: "openshift-insights", - }, - Data: map[string]string{ - "config.yaml": ` -dataReporting: - enabled: true - uploadEndpoint: https://overriden.upload/endpoint`, - }, - }, - legacyConfig: config.Controller{ - Report: false, - Endpoint: "http://testing.here", - }, - expectedConfig: &config.InsightsConfiguration{ - DataReporting: config.DataReporting{ - Enabled: false, - UploadEndpoint: "https://overriden.upload/endpoint", - }, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - var cs *kubefake.Clientset - if tt.configCM != nil { - cs = kubefake.NewSimpleClientset(tt.configCM) - } else { - cs = kubefake.NewSimpleClientset() - } - mockSecretConf := config.NewMockSecretConfigurator(&tt.legacyConfig) - staticAggregator := NewStaticConfigAggregator(mockSecretConf, cs) - - testConfig := staticAggregator.Config() - assert.Equal(t, tt.expectedConfig, testConfig) - }) - } -} - -func TestMergeUsingInformer(t *testing.T) { - tests := []struct { - name string - configFromInf config.InsightsConfiguration - legacyConfig config.Controller - expectedConfig *config.InsightsConfiguration - }{ - { - name: "No config map exists - legacy config is used", - configFromInf: config.InsightsConfiguration{}, - legacyConfig: config.Controller{ - Report: true, - StoragePath: "/foo/bar/", - Endpoint: "http://testing.here", - ReportEndpoint: "http://reportendpoint.here", - Interval: 2 * time.Hour, - ConditionalGathererEndpoint: "http://conditionalendpoint.here", - }, - expectedConfig: &config.InsightsConfiguration{ - DataReporting: config.DataReporting{ - Enabled: true, - UploadEndpoint: "http://testing.here", - StoragePath: "/foo/bar/", - DownloadEndpoint: "http://reportendpoint.here", - Interval: 2 * time.Hour, - ConditionalGathererEndpoint: "http://conditionalendpoint.here", - }, - }, - }, - { - name: "Config map exists and overrides legacy config", - configFromInf: config.InsightsConfiguration{ - DataReporting: config.DataReporting{ - Interval: 1 * time.Hour, - UploadEndpoint: "https://overriden.upload/endpoint", - StoragePath: "/var/lib/test", - DownloadEndpoint: "https://overriden.download/endpoint", - ConditionalGathererEndpoint: "https://overriden.conditional/endpoint", - }, - }, - legacyConfig: config.Controller{ - Report: true, - StoragePath: "/foo/bar/", - Endpoint: "http://testing.here", - ReportEndpoint: "http://reportendpoint.here", - Interval: 2 * time.Hour, - ConditionalGathererEndpoint: "http://conditionalendpoint.here", - }, - expectedConfig: &config.InsightsConfiguration{ - DataReporting: config.DataReporting{ - Enabled: true, - UploadEndpoint: "https://overriden.upload/endpoint", - StoragePath: "/var/lib/test", - DownloadEndpoint: "https://overriden.download/endpoint", - Interval: 1 * time.Hour, - ConditionalGathererEndpoint: "https://overriden.conditional/endpoint", - }, - }, - }, - { - name: "Config map cannot override \"Report\" bool attribute", - configFromInf: config.InsightsConfiguration{ - DataReporting: config.DataReporting{ - Enabled: true, - UploadEndpoint: "https://overriden.upload/endpoint", - }, - }, - legacyConfig: config.Controller{ - Report: false, - Endpoint: "http://testing.here", - }, - expectedConfig: &config.InsightsConfiguration{ - DataReporting: config.DataReporting{ - Enabled: false, - UploadEndpoint: "https://overriden.upload/endpoint", - }, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - mockSecretConf := config.NewMockSecretConfigurator(&tt.legacyConfig) - mockConfigMapInf := NewMockConfigMapInformer(&tt.configFromInf) - informerAggregator := NewConfigAggregator(mockSecretConf, mockConfigMapInf) - - testConfig := informerAggregator.Config() - assert.Equal(t, tt.expectedConfig, testConfig) - }) - } -} - -type MockConfigMapInformer struct { - factory.Controller - config *config.InsightsConfiguration -} - -func NewMockConfigMapInformer(cfg *config.InsightsConfiguration) *MockConfigMapInformer { - return &MockConfigMapInformer{ - config: cfg, - } -} - -func (m *MockConfigMapInformer) Config() *config.InsightsConfiguration { - return m.config -} - -func (m *MockConfigMapInformer) ConfigChanged() (configCh <-chan struct{}, closeFn func()) { - return nil, nil -} diff --git a/pkg/config/configobserver/configmapobserver.go b/pkg/config/configobserver/configmapobserver.go deleted file mode 100644 index 14ceeccfc..000000000 --- a/pkg/config/configobserver/configmapobserver.go +++ /dev/null @@ -1,133 +0,0 @@ -package configobserver - -import ( - "context" - "sync" - "time" - - "github.com/openshift/insights-operator/pkg/config" - "github.com/openshift/library-go/pkg/controller/factory" - "github.com/openshift/library-go/pkg/operator/events" - "github.com/openshift/library-go/pkg/operator/v1helpers" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/yaml" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/klog/v2" -) - -const ( - insightsConfigMapName = "insights-config" -) - -type ConfigMapInformer interface { - factory.Controller - // Config provides actual Insights configuration values from the "insights-config" configmap - Config() *config.InsightsConfiguration - // ConfigChanged notifies all the listeners that the content of the "insights-config" configmap has changed - ConfigChanged() (<-chan struct{}, func()) -} - -// ConfigMapObserver is a controller for "insights-config" config map -// in the "openshift-insights" namespace. -type ConfigMapObserver struct { - factory.Controller - lock sync.Mutex - kubeCli *kubernetes.Clientset - insightsConfig *config.InsightsConfiguration - listeners map[chan struct{}]struct{} -} - -func NewConfigMapObserver(kubeConfig *rest.Config, - eventRecorder events.Recorder, - kubeInformer v1helpers.KubeInformersForNamespaces) (ConfigMapInformer, error) { - cmInformer := kubeInformer.InformersFor(insightsNamespaceName).Core().V1().ConfigMaps().Informer() - kubeClient, err := kubernetes.NewForConfig(kubeConfig) - if err != nil { - return nil, err - } - - ctrl := &ConfigMapObserver{ - kubeCli: kubeClient, - insightsConfig: nil, - listeners: make(map[chan struct{}]struct{}), - } - factoryCtrl := factory.New().WithInformers(cmInformer). - WithSync(ctrl.sync). - ResyncEvery(10*time.Minute). - ToController("ConfigController", eventRecorder) - - ctrl.Controller = factoryCtrl - return ctrl, nil -} - -// sync is called by the informer with every config map update -func (c *ConfigMapObserver) sync(ctx context.Context, _ factory.SyncContext) error { - cm, err := getConfigMap(ctx, c.kubeCli) - if err != nil { - if !errors.IsNotFound(err) { - return err - } - // config map doesn't exist so clear the config, notify and return - klog.Info(err) - c.insightsConfig = nil - c.notifyListeners() - return nil - } - insightsConfig, err := readConfigAndDecode(cm) - if err != nil { - return err - } - // do not notify listeners on resync - if c.insightsConfig != insightsConfig { - c.insightsConfig = insightsConfig - c.notifyListeners() - } - return nil -} - -func (c *ConfigMapObserver) notifyListeners() { - for ch := range c.listeners { - if ch == nil { - continue - } - ch <- struct{}{} - } -} - -func (c *ConfigMapObserver) Config() *config.InsightsConfiguration { - c.lock.Lock() - defer c.lock.Unlock() - return c.insightsConfig -} - -func (c *ConfigMapObserver) ConfigChanged() (configCh <-chan struct{}, closeFn func()) { - c.lock.Lock() - defer c.lock.Unlock() - ch := make(chan struct{}) - c.listeners[ch] = struct{}{} - return ch, func() { - c.lock.Lock() - defer c.lock.Unlock() - close(ch) - delete(c.listeners, ch) - } -} - -// readConfigAndDecode gets the "insights-config" config map and tries to decode its content. It returns -// "config.InsightsConfiguration" when successfully decoded, otherwise an error. -func readConfigAndDecode(cm *v1.ConfigMap) (*config.InsightsConfiguration, error) { - insightsConfig := &config.InsightsConfigurationSerialized{} - cfg := cm.Data["config.yaml"] - err := yaml.Unmarshal([]byte(cfg), insightsConfig) - if err != nil { - return nil, err - } - return insightsConfig.ToConfig(), nil -} - -func getConfigMap(ctx context.Context, kubeCli *kubernetes.Clientset) (*v1.ConfigMap, error) { - return kubeCli.CoreV1().ConfigMaps(insightsNamespaceName).Get(ctx, insightsConfigMapName, metav1.GetOptions{}) -} diff --git a/pkg/config/mock_configurator.go b/pkg/config/mock_configurator.go index 8ea9b3125..9bbb7f7c3 100644 --- a/pkg/config/mock_configurator.go +++ b/pkg/config/mock_configurator.go @@ -1,8 +1,6 @@ package config import ( - "context" - "github.com/openshift/api/config/v1alpha1" "github.com/openshift/insights-operator/pkg/utils" "github.com/openshift/library-go/pkg/controller/factory" @@ -57,34 +55,3 @@ func (mc *MockAPIConfigurator) GatherDisabled() bool { } return false } - -func (mc *MockAPIConfigurator) GatherDataPolicy() *v1alpha1.DataPolicy { - if mc.config != nil { - return &mc.config.DataPolicy - } - return nil -} - -type MockConfigMapConfigurator struct { - factory.Controller - insightsConfig *InsightsConfiguration -} - -func NewMockConfigMapConfigurator(config *InsightsConfiguration) *MockConfigMapConfigurator { - return &MockConfigMapConfigurator{ - insightsConfig: config, - } -} - -func (m *MockConfigMapConfigurator) Config() *InsightsConfiguration { - return m.insightsConfig -} - -func (m *MockConfigMapConfigurator) ConfigChanged() (configCh <-chan struct{}, closeFn func()) { - // noop - return nil, func() {} -} - -func (m *MockConfigMapConfigurator) Listen(context.Context) { - -} diff --git a/pkg/config/types.go b/pkg/config/types.go deleted file mode 100644 index 4166b9885..000000000 --- a/pkg/config/types.go +++ /dev/null @@ -1,90 +0,0 @@ -package config - -import ( - "fmt" - "time" - - "k8s.io/klog/v2" -) - -const defaultGatherPeriod = 2 * time.Hour - -// InsightsConfigurationSerialized is a type representing Insights -// Operator configuration values in JSON/YAML and it is when decoding -// the content of the "insights-config" config map. -type InsightsConfigurationSerialized struct { - DataReporting DataReportingSerialized `json:"dataReporting"` -} - -type DataReportingSerialized struct { - Interval string `json:"interval,omitempty"` - UploadEndpoint string `json:"uploadEndpoint,omitempty"` - DownloadEndpoint string `json:"downloadEndpoint,omitempty"` - DownloadEndpointTechPreview string `json:"downloadEndpointTechPreview,omitempty"` - StoragePath string `json:"storagePath,omitempty"` - ConditionalGathererEndpoint string `json:"conditionalGathererEndpoint,omitempty"` - ProcessingStatusEndpoint string `json:"processingStatusEndpoint"` -} - -// InsightsConfiguration is a type representing actual Insights -// Operator configuration options and is used in the code base -// to make the configuration available. -type InsightsConfiguration struct { - DataReporting DataReporting -} - -// DataReporting is a type including all -// the configuration options related to Insights data gathering, -// upload of the data and download of the corresponding Insights analysis report. -type DataReporting struct { - Enabled bool - Interval time.Duration - UploadEndpoint string - DownloadEndpoint string - DownloadEndpointTechPreview string - StoragePath string - ConditionalGathererEndpoint string - ReportPullingDelay time.Duration - ProcessingStatusEndpoint string -} - -// ToConfig reads and pareses the actual serialized configuration from "InsightsConfigurationSerialized" -// and returns the "InsightsConfiguration". -func (i *InsightsConfigurationSerialized) ToConfig() *InsightsConfiguration { - ic := &InsightsConfiguration{ - DataReporting: DataReporting{ - UploadEndpoint: i.DataReporting.UploadEndpoint, - DownloadEndpoint: i.DataReporting.DownloadEndpoint, - DownloadEndpointTechPreview: i.DataReporting.DownloadEndpointTechPreview, - StoragePath: i.DataReporting.StoragePath, - ConditionalGathererEndpoint: i.DataReporting.ConditionalGathererEndpoint, - ProcessingStatusEndpoint: i.DataReporting.ProcessingStatusEndpoint, - }, - } - if i.DataReporting.Interval != "" { - interval, err := time.ParseDuration(i.DataReporting.Interval) - if err != nil { - klog.Errorf("Cannot parse interval time duration: %v. Using default value %s", err, defaultGatherPeriod) - } - if interval <= 0 { - interval = defaultGatherPeriod - } - ic.DataReporting.Interval = interval - } - return ic -} - -func (i *InsightsConfiguration) String() string { - s := fmt.Sprintf(`interval=%s, - upload_endpoint=%s, - storage_path=%s, - download_endpoint=%s, - conditional_gatherer_endpoint=%s`, - i.DataReporting.Interval, - i.DataReporting.UploadEndpoint, - i.DataReporting.StoragePath, - i.DataReporting.DownloadEndpoint, - i.DataReporting.ConditionalGathererEndpoint, - ) - return s -} diff --git a/pkg/controller/gather_commands.go b/pkg/controller/gather_commands.go index a91eadce1..fcd0d9227 100644 --- a/pkg/controller/gather_commands.go +++ b/pkg/controller/gather_commands.go @@ -69,7 +69,6 @@ func (g *GatherJob) Gather(ctx context.Context, kubeConfig, protoKubeConfig *res // configobserver synthesizes all config into the status reporter controller configObserver := configobserver.New(g.Controller, kubeClient) - configAggregator := configobserver.NewStaticConfigAggregator(configObserver, kubeClient) // anonymizer is responsible for anonymizing sensitive data, it can be configured to disable specific anonymization anonymizer, err := anonymization.NewAnonymizerFromConfig( @@ -100,7 +99,7 @@ func (g *GatherJob) Gather(ctx context.Context, kubeConfig, protoKubeConfig *res insightsClient := insightsclient.New(nil, 0, "default", authorizer, gatherConfigClient) createdGatherers := gather.CreateAllGatherers( gatherKubeConfig, gatherProtoKubeConfig, metricsGatherKubeConfig, alertsGatherKubeConfig, anonymizer, - configAggregator, insightsClient, + configObserver, insightsClient, ) allFunctionReports := make(map[string]gather.GathererFunctionReport) @@ -156,10 +155,8 @@ func (g *GatherJob) GatherAndUpload(kubeConfig, protoKubeConfig *rest.Config) er if err != nil { return err } - // configobserver synthesizes all config into the status reporter controller configObserver := configobserver.New(g.Controller, kubeClient) - configAggregator := configobserver.NewStaticConfigAggregator(configObserver, kubeClient) // anonymizer is responsible for anonymizing sensitive data, it can be configured to disable specific anonymization anonymizer, err := anonymization.NewAnonymizerFromConfig( ctx, gatherKubeConfig, gatherProtoKubeConfig, protoKubeConfig, configObserver, dataGatherCR.Spec.DataPolicy) @@ -180,9 +177,9 @@ func (g *GatherJob) GatherAndUpload(kubeConfig, protoKubeConfig *rest.Config) er createdGatherers := gather.CreateAllGatherers( gatherKubeConfig, gatherProtoKubeConfig, metricsGatherKubeConfig, alertsGatherKubeConfig, anonymizer, - configAggregator, insightsHTTPCli, + configObserver, insightsHTTPCli, ) - uploader := insightsuploader.New(nil, insightsHTTPCli, configAggregator, nil, nil, 0) + uploader := insightsuploader.New(nil, insightsHTTPCli, configObserver, nil, nil, 0) dataGatherCR, err = status.UpdateDataGatherState(ctx, insightsV1alphaCli, dataGatherCR, insightsv1alpha1.Running) if err != nil { @@ -230,7 +227,7 @@ func (g *GatherJob) GatherAndUpload(kubeConfig, protoKubeConfig *rest.Config) er } // check if the archive/data was processed - processed, err := wasDataProcessed(ctx, insightsHTTPCli, insightsRequestID, configAggregator.Config()) + processed, err := wasDataProcessed(ctx, insightsHTTPCli, insightsRequestID, configObserver.Config()) dataProcessedCon := status.DataProcessedCondition(metav1.ConditionTrue, "Processed", "") if err != nil || !processed { msg := fmt.Sprintf("Data was not processed in the console.redhat.com pipeline for the request %s", insightsRequestID) @@ -320,15 +317,15 @@ type dataStatus struct { // "insightsRequestID" and tries to parse the response body in case of HTTP 200 response. func wasDataProcessed(ctx context.Context, insightsCli processingStatusClient, - insightsRequestID string, conf *config.InsightsConfiguration) (bool, error) { - delay := conf.DataReporting.ReportPullingDelay + insightsRequestID string, controllerConf *config.Controller) (bool, error) { + delay := controllerConf.ReportPullingDelay retryCounter := 0 klog.V(4).Infof("Initial delay when checking processing status: %v", delay) var resp *http.Response err := wait.PollUntilContextCancel(ctx, delay, false, func(ctx context.Context) (done bool, err error) { resp, err = insightsCli.GetWithPathParams(ctx, // nolint: bodyclose - conf.DataReporting.ProcessingStatusEndpoint, insightsRequestID) // response body is closed later + controllerConf.ProcessingStatusEndpoint, insightsRequestID) // response body is closed later if err != nil { return false, err } diff --git a/pkg/controller/gather_commands_test.go b/pkg/controller/gather_commands_test.go index 7b84332d3..d67fbf5c8 100644 --- a/pkg/controller/gather_commands_test.go +++ b/pkg/controller/gather_commands_test.go @@ -94,12 +94,10 @@ func TestWasDataProcessed(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - mockConfig := &config.InsightsConfiguration{ - DataReporting: config.DataReporting{ - ReportPullingDelay: 10 * time.Millisecond, - }, + mockConfCtrl := &config.Controller{ + ReportPullingDelay: 10 * time.Millisecond, } - processed, err := wasDataProcessed(context.Background(), &tt.mockClient, "empty", mockConfig) + processed, err := wasDataProcessed(context.Background(), &tt.mockClient, "empty", mockConfCtrl) assert.Equal(t, tt.expectedErr, err) assert.Equal(t, tt.expectedProcessed, processed) }) diff --git a/pkg/controller/operator.go b/pkg/controller/operator.go index 77f7cf7c7..53dbd8657 100644 --- a/pkg/controller/operator.go +++ b/pkg/controller/operator.go @@ -14,7 +14,6 @@ import ( operatorv1client "github.com/openshift/client-go/operator/clientset/versioned/typed/operator/v1" "github.com/openshift/library-go/pkg/controller/controllercmd" "github.com/openshift/library-go/pkg/operator/configobserver/featuregates" - "github.com/openshift/library-go/pkg/operator/v1helpers" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -151,21 +150,10 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller go insightsInformersfactory.Start(ctx.Done()) } - kubeInf := v1helpers.NewKubeInformersForNamespaces(kubeClient, "openshift-insights") - configMapObserver, err := configobserver.NewConfigMapObserver(gatherKubeConfig, controller.EventRecorder, kubeInf) - if err != nil { - return err - } - go kubeInf.Start(ctx.Done()) - go configMapObserver.Run(ctx, 1) - // secretConfigObserver synthesizes all config into the status reporter controller secretConfigObserver := configobserver.New(s.Controller, kubeClient) go secretConfigObserver.Start(ctx) - configAggregator := configobserver.NewConfigAggregator(secretConfigObserver, configMapObserver) - go configAggregator.Listen(ctx) - // the status controller initializes the cluster operator object and retrieves // the last sync time, if any was set statusReporter := status.NewController(configClient.ConfigV1(), secretConfigObserver, @@ -209,15 +197,15 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller // and provide the results for the recorder gatherers := gather.CreateAllGatherers( gatherKubeConfig, gatherProtoKubeConfig, metricsGatherKubeConfig, alertsGatherKubeConfig, anonymizer, - configAggregator, insightsClient, + secretConfigObserver, insightsClient, ) if !insightsConfigAPIEnabled { - periodicGather = periodic.New(configAggregator, rec, gatherers, anonymizer, + periodicGather = periodic.New(secretConfigObserver, rec, gatherers, anonymizer, operatorClient.InsightsOperators(), kubeClient) statusReporter.AddSources(periodicGather.Sources()...) } else { reportRetriever := insightsreport.NewWithTechPreview(insightsClient, secretConfigObserver) - periodicGather = periodic.NewWithTechPreview(reportRetriever, configAggregator, + periodicGather = periodic.NewWithTechPreview(reportRetriever, secretConfigObserver, insightsDataGatherObserver, gatherers, kubeClient, insightClient.InsightsV1alpha1(), operatorClient.InsightsOperators(), dgInformer) statusReporter.AddSources(periodicGather.Sources()...) statusReporter.AddSources(reportRetriever) @@ -238,7 +226,7 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller if !insightsConfigAPIEnabled { // upload results to the provided client - if no client is configured reporting // is permanently disabled, but if a client does exist the server may still disable reporting - uploader := insightsuploader.New(recdriver, insightsClient, configAggregator, + uploader := insightsuploader.New(recdriver, insightsClient, secretConfigObserver, insightsDataGatherObserver, statusReporter, initialDelay) statusReporter.AddSources(uploader) diff --git a/pkg/controller/periodic/periodic.go b/pkg/controller/periodic/periodic.go index 5b9fb14ec..e7e236f15 100644 --- a/pkg/controller/periodic/periodic.go +++ b/pkg/controller/periodic/periodic.go @@ -44,8 +44,8 @@ var ( // Controller periodically runs gatherers, records their results to the recorder // and flushes the recorder to create archives type Controller struct { + secretConfigurator configobserver.Configurator apiConfigurator configobserver.InsightsDataGatherObserver - configAggregator configobserver.Interface recorder recorder.FlushInterface gatherers []gatherers.Interface statuses map[string]controllerstatus.StatusController @@ -63,7 +63,7 @@ type Controller struct { func NewWithTechPreview( reportRetriever *insightsreport.Controller, - configAggregator configobserver.Interface, + secretConfigurator configobserver.Configurator, apiConfigurator configobserver.InsightsDataGatherObserver, listGatherers []gatherers.Interface, kubeClient kubernetes.Interface, @@ -78,7 +78,7 @@ func NewWithTechPreview( jobController := NewJobController(kubeClient) return &Controller{ reportRetriever: reportRetriever, - configAggregator: configAggregator, + secretConfigurator: secretConfigurator, apiConfigurator: apiConfigurator, gatherers: listGatherers, statuses: statuses, @@ -95,7 +95,7 @@ func NewWithTechPreview( // New creates a new instance of Controller which periodically invokes the gatherers // and flushes the recorder to create archives. func New( - configAggregator configobserver.Interface, + secretConfigurator configobserver.Configurator, rec recorder.FlushInterface, listGatherers []gatherers.Interface, anonymizer *anonymization.Anonymizer, @@ -110,7 +110,7 @@ func New( } return &Controller{ - configAggregator: configAggregator, + secretConfigurator: secretConfigurator, recorder: rec, gatherers: listGatherers, statuses: statuses, @@ -189,8 +189,8 @@ func (c *Controller) Gather() { gatherersToProcess = append(gatherersToProcess, gatherer) } } - interval := c.configAggregator.Config().DataReporting.Interval - ctx, cancel := context.WithTimeout(context.Background(), interval) + + ctx, cancel := context.WithTimeout(context.Background(), c.secretConfigurator.Config().Interval) defer cancel() allFunctionReports := make(map[string]gather.GathererFunctionReport) @@ -232,30 +232,30 @@ func (c *Controller) Gather() { // Periodically starts the gathering. // If there is an initialDelay set then it waits that much for the first gather to happen. func (c *Controller) periodicTrigger(stopCh <-chan struct{}) { - configCh, closeFn := c.configAggregator.ConfigChanged() + configCh, closeFn := c.secretConfigurator.ConfigChanged() defer closeFn() - interval := c.configAggregator.Config().DataReporting.Interval + interval := c.secretConfigurator.Config().Interval klog.Infof("Gathering cluster info every %s", interval) - klog.Infof("Configuration is %v", c.configAggregator.Config().String()) - t := time.NewTicker(interval) for { select { case <-stopCh: - t.Stop() return + case <-configCh: - newInterval := c.configAggregator.Config().DataReporting.Interval + newInterval := c.secretConfigurator.Config().Interval if newInterval == interval { continue } - interval = newInterval - t.Reset(interval) klog.Infof("Gathering cluster info every %s", interval) - klog.Infof("Configuration is %v", c.configAggregator.Config().String()) - case <-t.C: - c.Gather() + + case <-time.After(interval): + if c.techPreview { + c.GatherJob() + } else { + c.Gather() + } } } } @@ -271,7 +271,7 @@ func (c *Controller) onDemandGather(stopCh <-chan struct{}) { return case dgName := <-c.dgInf.DataGatherCreated(): go func() { - ctx, cancel := context.WithTimeout(context.Background(), c.configAggregator.Config().DataReporting.Interval*4) + ctx, cancel := context.WithTimeout(context.Background(), c.secretConfigurator.Config().Interval*4) defer cancel() state, err := c.dataGatherState(ctx, dgName) @@ -300,7 +300,7 @@ func (c *Controller) GatherJob() { klog.V(3).Info("Gather is disabled by configuration.") return } - ctx, cancel := context.WithTimeout(context.Background(), c.configAggregator.Config().DataReporting.Interval*4) + ctx, cancel := context.WithTimeout(context.Background(), c.secretConfigurator.Config().Interval*4) defer cancel() if c.image == "" { @@ -329,7 +329,7 @@ func (c *Controller) GatherJob() { func (c *Controller) runJobAndCheckResults(ctx context.Context, dataGatherName string) { // create a new periodic gathering job - gj, err := c.jobController.CreateGathererJob(ctx, dataGatherName, c.image, c.configAggregator.Config().DataReporting.StoragePath) + gj, err := c.jobController.CreateGathererJob(ctx, dataGatherName, c.image, c.secretConfigurator.Config().StoragePath) if err != nil { klog.Errorf("Failed to create a new job: %v", err) return @@ -401,7 +401,7 @@ func (c *Controller) updateInsightsReportInDataGather(ctx context.Context, dg.Status.InsightsReport.HealthChecks = append(dg.Status.InsightsReport.HealthChecks, healthCheck) } dg.Status.InsightsReport.DownloadedAt = report.DownloadedAt - uri := fmt.Sprintf(c.configAggregator.Config().DataReporting.DownloadEndpointTechPreview, report.ClusterID, report.RequestID) + uri := fmt.Sprintf(c.secretConfigurator.Config().ReportEndpointTechPreview, report.ClusterID, report.RequestID) dg.Status.InsightsReport.URI = uri _, err := c.dataGatherClient.DataGathers().UpdateStatus(ctx, dg, metav1.UpdateOptions{}) return err @@ -468,7 +468,7 @@ func (c *Controller) updateOperatorStatusCR(ctx context.Context, allFunctionRepo func (c *Controller) isGatheringDisabled() bool { // old way of disabling data gathering by removing // the "cloud.openshift.com" token from the pull-secret - if !c.configAggregator.Config().DataReporting.Enabled { + if !c.secretConfigurator.Config().Report { return true } diff --git a/pkg/controller/periodic/periodic_test.go b/pkg/controller/periodic/periodic_test.go index 2c9e7b943..a801adaeb 100644 --- a/pkg/controller/periodic/periodic_test.go +++ b/pkg/controller/periodic/periodic_test.go @@ -101,9 +101,9 @@ func Test_Controller_periodicTrigger(t *testing.T) { }{ { name: "periodicTrigger finished gathering", - interval: 2 * time.Second, + interval: 1 * time.Second, waitTime: 3 * time.Second, - expectedNumOfRecords: 6, + expectedNumOfRecords: 12, }, { name: "periodicTrigger stopped with no data gathered", @@ -211,19 +211,13 @@ func getMocksForPeriodicTest(listGatherers []gatherers.Interface, interval time. Report: true, Interval: interval, }} - mockConfigMapConfigurator := config.NewMockConfigMapConfigurator(&config.InsightsConfiguration{ - DataReporting: config.DataReporting{ - Enabled: true, - Interval: interval, - }, - }) mockRecorder := recorder.MockRecorder{} mockAnonymizer, err := anonymization.NewAnonymizer("", []string{}, nil, &mockConfigurator, "") if err != nil { return nil, nil, err } fakeInsightsOperatorCli := fakeOperatorCli.NewSimpleClientset().OperatorV1().InsightsOperators() - mockController := New(mockConfigMapConfigurator, &mockRecorder, listGatherers, mockAnonymizer, fakeInsightsOperatorCli, nil) + mockController := New(&mockConfigurator, &mockRecorder, listGatherers, mockAnonymizer, fakeInsightsOperatorCli, nil) return mockController, &mockRecorder, nil } @@ -1081,13 +1075,12 @@ func TestUpdateInsightsReportInDataGather(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { insightsCs := insightsFakeCli.NewSimpleClientset(tt.dataGatherToUpdate) - conf := &config.InsightsConfiguration{ - DataReporting: config.DataReporting{ - DownloadEndpointTechPreview: "https://test.report.endpoint.tech.preview.uri/cluster/%s/requestID/%s", + mockSecretConf := &config.MockSecretConfigurator{ + Conf: &config.Controller{ + ReportEndpointTechPreview: "https://test.report.endpoint.tech.preview.uri/cluster/%s/requestID/%s", }, } - mockCMConf := config.NewMockConfigMapConfigurator(conf) - mockController := NewWithTechPreview(nil, mockCMConf, nil, nil, nil, insightsCs.InsightsV1alpha1(), nil, nil) + mockController := NewWithTechPreview(nil, mockSecretConf, nil, nil, nil, insightsCs.InsightsV1alpha1(), nil, nil) err := mockController.updateInsightsReportInDataGather(context.Background(), tt.analysisReport, tt.dataGatherToUpdate) assert.NoError(t, err) assert.Equal(t, tt.expectedInsightsReport, &tt.dataGatherToUpdate.Status.InsightsReport) diff --git a/pkg/gather/gather.go b/pkg/gather/gather.go index 9116bbfcc..2cd5e5eee 100644 --- a/pkg/gather/gather.go +++ b/pkg/gather/gather.go @@ -61,7 +61,7 @@ type ArchiveMetadata struct { // CreateAllGatherers creates all the gatherers func CreateAllGatherers( gatherKubeConfig, gatherProtoKubeConfig, metricsGatherKubeConfig, alertsGatherKubeConfig *rest.Config, - anonymizer *anonymization.Anonymizer, configObserver configobserver.Interface, + anonymizer *anonymization.Anonymizer, configObserver *configobserver.Controller, insightsClient *insightsclient.Client, ) []gatherers.Interface { clusterConfigGatherer := clusterconfig.New( diff --git a/pkg/gatherers/clusterconfig/clusterconfig_gatherer.go b/pkg/gatherers/clusterconfig/clusterconfig_gatherer.go index 3adc7845c..c5d625f5e 100644 --- a/pkg/gatherers/clusterconfig/clusterconfig_gatherer.go +++ b/pkg/gatherers/clusterconfig/clusterconfig_gatherer.go @@ -20,6 +20,7 @@ type Gatherer struct { alertsGatherKubeConfig *rest.Config anonymizer *anonymization.Anonymizer interval time.Duration + configObserver *configobserver.Controller } // gathererFuncPtr is a type for pointers to functions of Gatherer @@ -91,11 +92,11 @@ var gatheringFunctions = map[string]gathererFuncPtr{ func New( gatherKubeConfig, gatherProtoKubeConfig, metricsGatherKubeConfig, alertsGatherKubeConfig *rest.Config, - anonymizer *anonymization.Anonymizer, configObserver configobserver.Interface, + anonymizer *anonymization.Anonymizer, configObserver *configobserver.Controller, ) *Gatherer { interval := time.Minute if configObserver != nil && configObserver.Config() != nil { - interval = configObserver.Config().DataReporting.Interval + interval = configObserver.Config().Interval } return &Gatherer{ @@ -105,6 +106,7 @@ func New( alertsGatherKubeConfig: alertsGatherKubeConfig, anonymizer: anonymizer, interval: interval, + configObserver: configObserver, } } diff --git a/pkg/gatherers/clusterconfig/gather_support_secret.go b/pkg/gatherers/clusterconfig/gather_support_secret.go index a7489e613..231bf070f 100644 --- a/pkg/gatherers/clusterconfig/gather_support_secret.go +++ b/pkg/gatherers/clusterconfig/gather_support_secret.go @@ -2,12 +2,10 @@ package clusterconfig import ( "context" + "fmt" "github.com/openshift/insights-operator/pkg/record" "github.com/openshift/insights-operator/pkg/utils/anonymize" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - v1 "k8s.io/client-go/kubernetes/typed/core/v1" ) // GatherSupportSecret Collects anonymized support secret if there is any @@ -32,25 +30,19 @@ import ( // // ### Changes // None -func (g *Gatherer) GatherSupportSecret(ctx context.Context) ([]record.Record, []error) { - gatherKubeClient, err := kubernetes.NewForConfig(g.gatherKubeConfig) - if err != nil { - return nil, []error{err} +func (g *Gatherer) GatherSupportSecret(context.Context) ([]record.Record, []error) { + if g.configObserver == nil { + return nil, []error{fmt.Errorf("configObserver is nil")} } - return gatherSupportSecret(ctx, gatherKubeClient.CoreV1()) -} - -func gatherSupportSecret(ctx context.Context, cli v1.CoreV1Interface) ([]record.Record, []error) { - supportSecret, err := cli.Secrets("openshift-config").Get(ctx, "support", metav1.GetOptions{}) - if err != nil { - return nil, []error{err} + if supportSecret := g.configObserver.SupportSecret(); supportSecret != nil && supportSecret.Data != nil { + return []record.Record{{ + Name: "config/secrets/openshift-config/support/data", + Item: record.JSONMarshaller{Object: anonymizeSecretData(supportSecret.Data)}, + }}, nil } - return []record.Record{{ - Name: "config/secrets/openshift-config/support/data", - Item: record.JSONMarshaller{Object: anonymizeSecretData(supportSecret.Data)}, - }}, nil + return nil, nil } func anonymizeSecretData(data map[string][]byte) map[string][]byte { diff --git a/pkg/gatherers/clusterconfig/gather_support_secret_test.go b/pkg/gatherers/clusterconfig/gather_support_secret_test.go index 167dcc6cc..e3676b3da 100644 --- a/pkg/gatherers/clusterconfig/gather_support_secret_test.go +++ b/pkg/gatherers/clusterconfig/gather_support_secret_test.go @@ -9,6 +9,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubefake "k8s.io/client-go/kubernetes/fake" + "github.com/openshift/insights-operator/pkg/config" + "github.com/openshift/insights-operator/pkg/config/configobserver" "github.com/openshift/insights-operator/pkg/record" ) @@ -24,8 +26,12 @@ func Test_gatherSupportSecret(t *testing.T) { }, }, metav1.CreateOptions{}) assert.NoError(t, err) + configObserver := configobserver.New(config.Controller{}, kubeClient) + gatherer := New( + nil, nil, nil, nil, nil, configObserver, + ) - records, errs := gatherSupportSecret(context.Background(), kubeClient.CoreV1()) + records, errs := gatherer.GatherSupportSecret(context.TODO()) assert.Empty(t, errs) assert.Len(t, records, 1) assert.Equal(t, record.Record{ diff --git a/pkg/gatherers/conditional/conditional_gatherer.go b/pkg/gatherers/conditional/conditional_gatherer.go index 90fc77ea4..2b9eabd03 100644 --- a/pkg/gatherers/conditional/conditional_gatherer.go +++ b/pkg/gatherers/conditional/conditional_gatherer.go @@ -36,7 +36,7 @@ type Gatherer struct { firingAlerts map[string][]AlertLabels gatheringRules GatheringRules clusterVersion string - configurator configobserver.Interface + configurator configobserver.Configurator gatheringRulesServiceClient GatheringRulesServiceClient } @@ -47,7 +47,7 @@ type GatheringRulesServiceClient interface { // New creates a new instance of conditional gatherer with the appropriate configs func New( gatherProtoKubeConfig, metricsGatherKubeConfig, gatherKubeConfig *rest.Config, - configurator configobserver.Interface, gatheringRulesServiceClient GatheringRulesServiceClient, + configurator configobserver.Configurator, gatheringRulesServiceClient GatheringRulesServiceClient, ) *Gatherer { var imageKubeConfig *rest.Config if gatherProtoKubeConfig != nil { @@ -212,7 +212,7 @@ func (g *Gatherer) getRulesEndpoint() (string, error) { return "", fmt.Errorf("config is nil") } - return config.DataReporting.ConditionalGathererEndpoint, nil + return config.ConditionalGathererEndpoint, nil } // updateCache updates alerts and version caches diff --git a/pkg/gatherers/conditional/conditional_gatherer_test.go b/pkg/gatherers/conditional/conditional_gatherer_test.go index e70b0e4b8..23e12b95b 100644 --- a/pkg/gatherers/conditional/conditional_gatherer_test.go +++ b/pkg/gatherers/conditional/conditional_gatherer_test.go @@ -294,18 +294,12 @@ func newEmptyGatherer(gathererConfig string) *Gatherer { // nolint:gocritic }] }` } - testConf := &config.InsightsConfiguration{ - DataReporting: config.DataReporting{ - ConditionalGathererEndpoint: "/gathering_rules", - }, - } - mockConfigurator := config.NewMockConfigMapConfigurator(testConf) return New( nil, nil, nil, - mockConfigurator, + &config.MockSecretConfigurator{Conf: &config.Controller{ConditionalGathererEndpoint: "/gathering_rules"}}, &MockGatheringRulesServiceClient{Conf: gathererConfig}, ) } diff --git a/pkg/insights/insightsuploader/insightsuploader.go b/pkg/insights/insightsuploader/insightsuploader.go index 2636a1028..f260a74a0 100644 --- a/pkg/insights/insightsuploader/insightsuploader.go +++ b/pkg/insights/insightsuploader/insightsuploader.go @@ -33,35 +33,35 @@ type StatusReporter interface { type Controller struct { controllerstatus.StatusController - summarizer Summarizer - client *insightsclient.Client - configurator configobserver.Interface - apiConfigurator configobserver.InsightsDataGatherObserver - reporter StatusReporter - archiveUploaded chan struct{} - initialDelay time.Duration - backoff wait.Backoff + summarizer Summarizer + client *insightsclient.Client + secretConfigurator configobserver.Configurator + apiConfigurator configobserver.InsightsDataGatherObserver + reporter StatusReporter + archiveUploaded chan struct{} + initialDelay time.Duration + backoff wait.Backoff } func New(summarizer Summarizer, client *insightsclient.Client, - configurator configobserver.Interface, + secretconfigurator configobserver.Configurator, apiConfigurator configobserver.InsightsDataGatherObserver, statusReporter StatusReporter, initialDelay time.Duration) *Controller { ctrl := &Controller{ - StatusController: controllerstatus.New("insightsuploader"), - summarizer: summarizer, - configurator: configurator, - apiConfigurator: apiConfigurator, - client: client, - reporter: statusReporter, - archiveUploaded: make(chan struct{}), - initialDelay: initialDelay, + StatusController: controllerstatus.New("insightsuploader"), + summarizer: summarizer, + secretConfigurator: secretconfigurator, + apiConfigurator: apiConfigurator, + client: client, + reporter: statusReporter, + archiveUploaded: make(chan struct{}), + initialDelay: initialDelay, } ctrl.backoff = wait.Backoff{ - Duration: ctrl.configurator.Config().DataReporting.Interval / 4, // 30 min as first wait by default + Duration: ctrl.secretConfigurator.Config().Interval / 4, // 30 min as first wait by default Steps: 4, Factor: 2, } @@ -77,9 +77,13 @@ func (c *Controller) Run(ctx context.Context) { } // the controller periodically uploads results to the remote insights endpoint - cfg := c.configurator.Config() + cfg := c.secretConfigurator.Config() + configCh, cancelFn := c.secretConfigurator.ConfigChanged() + defer cancelFn() - interval := cfg.DataReporting.Interval + reportingEnabled := cfg.Report + endpoint := cfg.Endpoint + interval := cfg.Interval lastReported := c.reporter.LastReportedTime() if !lastReported.IsZero() { next := lastReported.Add(interval) @@ -87,122 +91,90 @@ func (c *Controller) Run(ctx context.Context) { c.initialDelay = wait.Jitter(now.Sub(next), 1.2) } } - klog.V(2).Infof("Reporting status periodically to %s every %s, starting in %s", cfg.DataReporting.UploadEndpoint, interval, c.initialDelay.Truncate(time.Second)) - go wait.Until(func() { c.periodicTrigger(ctx.Done()) }, 5*time.Second, ctx.Done()) -} - -func (c *Controller) periodicTrigger(stopCh <-chan struct{}) { - klog.Infof("Checking archives to upload periodically every %s", c.initialDelay) - lastReported := c.reporter.LastReportedTime() - cfg := c.configurator.Config() - interval := cfg.DataReporting.Interval - endpoint := cfg.DataReporting.UploadEndpoint - var disabledInAPI bool - if c.apiConfigurator != nil { - disabledInAPI = c.apiConfigurator.GatherDisabled() - } - reportingEnabled := cfg.DataReporting.Enabled && !disabledInAPI - - configCh, cancelFn := c.configurator.ConfigChanged() - defer cancelFn() - - if c.initialDelay == 0 { - c.checkSummaryAndSend(interval, lastReported, endpoint, reportingEnabled) - return - } - ticker := time.NewTicker(c.initialDelay) - for { - select { - case <-stopCh: - ticker.Stop() - case <-ticker.C: - c.checkSummaryAndSend(interval, lastReported, endpoint, reportingEnabled) - ticker.Reset(c.initialDelay) - return - case <-configCh: - newCfg := c.configurator.Config() - endpoint = newCfg.DataReporting.UploadEndpoint - reportingEnabled = newCfg.DataReporting.Enabled - var disabledInAPI bool - if c.apiConfigurator != nil { - disabledInAPI = c.apiConfigurator.GatherDisabled() - } - if !reportingEnabled || disabledInAPI { - klog.V(2).Infof("Reporting was disabled") - c.initialDelay = newCfg.DataReporting.Interval - return - } - newInterval := newCfg.DataReporting.Interval - if newInterval == interval { - continue + klog.V(2).Infof("Reporting status periodically to %s every %s, starting in %s", cfg.Endpoint, interval, c.initialDelay.Truncate(time.Second)) + + wait.Until(func() { + if c.initialDelay > 0 { + select { + case <-ctx.Done(): + case <-time.After(c.initialDelay): + case <-configCh: + newCfg := c.secretConfigurator.Config() + interval = newCfg.Interval + endpoint = newCfg.Endpoint + reportingEnabled = newCfg.Report + var disabledInAPI bool + if c.apiConfigurator != nil { + disabledInAPI = c.apiConfigurator.GatherDisabled() + } + if !reportingEnabled || disabledInAPI { + klog.V(2).Infof("Reporting was disabled") + c.initialDelay = newCfg.Interval + return + } } - interval = newInterval - // there's no return in this case so set the initial delay again - c.initialDelay = wait.Jitter(interval/8, 0.1) - ticker.Reset(c.initialDelay) + c.initialDelay = 0 } - } -} -func (c *Controller) checkSummaryAndSend(interval time.Duration, lastReported time.Time, endpoint string, reportingEnabled bool) { - // attempt to get a summary to send to the server - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() + // attempt to get a summary to send to the server + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() - source, ok, err := c.summarizer.Summary(ctx, lastReported) - if err != nil { - c.StatusController.UpdateStatus(controllerstatus.Summary{Reason: "SummaryFailed", Message: fmt.Sprintf("Unable to retrieve local insights data: %v", err)}) - return - } - if !ok { - klog.V(4).Infof("Nothing to report since %s", lastReported.Format(time.RFC3339)) - return - } - defer source.Contents.Close() - if reportingEnabled && len(endpoint) > 0 { - // send the results - start := time.Now() - id := start.Format(time.RFC3339) - klog.V(4).Infof("Uploading latest report since %s", lastReported.Format(time.RFC3339)) - source.ID = id - source.Type = "application/vnd.redhat.openshift.periodic" - if err := c.client.Send(ctx, endpoint, *source); err != nil { - klog.V(2).Infof("Unable to upload report after %s: %v", time.Since(start).Truncate(time.Second/100), err) - if err == insightsclient.ErrWaitingForVersion { - c.initialDelay = wait.Jitter(time.Second*15, 1) - return - } - if authorizer.IsAuthorizationError(err) { - c.StatusController.UpdateStatus(controllerstatus.Summary{Operation: controllerstatus.Uploading, - Reason: "NotAuthorized", Message: fmt.Sprintf("Reporting was not allowed: %v", err)}) - c.initialDelay = wait.Jitter(interval/2, 2) - - return - } - - c.initialDelay = wait.Jitter(interval/8, 1.2) - c.StatusController.UpdateStatus(controllerstatus.Summary{Operation: controllerstatus.Uploading, - Reason: "UploadFailed", Message: fmt.Sprintf("Unable to report: %v", err)}) + source, ok, err := c.summarizer.Summary(ctx, lastReported) + if err != nil { + c.StatusController.UpdateStatus(controllerstatus.Summary{Reason: "SummaryFailed", Message: fmt.Sprintf("Unable to retrieve local insights data: %v", err)}) return } - klog.V(4).Infof("Uploaded report successfully in %s", time.Since(start)) - select { - case c.archiveUploaded <- struct{}{}: - default: + if !ok { + klog.V(4).Infof("Nothing to report since %s", lastReported.Format(time.RFC3339)) + return } - lastReported = start.UTC() - c.StatusController.UpdateStatus(controllerstatus.Summary{Healthy: true}) - } else { - klog.V(4).Info("Display report that would be sent") - // display what would have been sent (to ensure we always exercise source processing) - if err := reportToLogs(source.Contents, klog.V(4)); err != nil { - klog.Errorf("Unable to log upload: %v", err) + defer source.Contents.Close() + if reportingEnabled && len(endpoint) > 0 { + // send the results + start := time.Now() + id := start.Format(time.RFC3339) + klog.V(4).Infof("Uploading latest report since %s", lastReported.Format(time.RFC3339)) + source.ID = id + source.Type = "application/vnd.redhat.openshift.periodic" + if err := c.client.Send(ctx, endpoint, *source); err != nil { + klog.V(2).Infof("Unable to upload report after %s: %v", time.Since(start).Truncate(time.Second/100), err) + if err == insightsclient.ErrWaitingForVersion { + c.initialDelay = wait.Jitter(time.Second*15, 1) + return + } + if authorizer.IsAuthorizationError(err) { + c.StatusController.UpdateStatus(controllerstatus.Summary{Operation: controllerstatus.Uploading, + Reason: "NotAuthorized", Message: fmt.Sprintf("Reporting was not allowed: %v", err)}) + c.initialDelay = wait.Jitter(interval/2, 2) + return + } + + c.initialDelay = wait.Jitter(interval/8, 1.2) + c.StatusController.UpdateStatus(controllerstatus.Summary{Operation: controllerstatus.Uploading, + Reason: "UploadFailed", Message: fmt.Sprintf("Unable to report: %v", err)}) + return + } + klog.V(4).Infof("Uploaded report successfully in %s", time.Since(start)) + select { + case c.archiveUploaded <- struct{}{}: + default: + } + lastReported = start.UTC() + c.StatusController.UpdateStatus(controllerstatus.Summary{Healthy: true}) + } else { + klog.V(4).Info("Display report that would be sent") + // display what would have been sent (to ensure we always exercise source processing) + if err := reportToLogs(source.Contents, klog.V(4)); err != nil { + klog.Errorf("Unable to log upload: %v", err) + } + // we didn't actually report logs, so don't advance the report date } - // we didn't actually report logs, so don't advance the report date - } - c.reporter.SetLastReportedTime(lastReported) - c.initialDelay = wait.Jitter(interval/8, 0.1) + c.reporter.SetLastReportedTime(lastReported) + + c.initialDelay = wait.Jitter(interval, 1.2) + }, 15*time.Second, ctx.Done()) } // ArchiveUploaded returns a channel that indicates when an archive is uploaded @@ -220,7 +192,7 @@ func (c *Controller) Upload(ctx context.Context, s *insightsclient.Source) (stri var requestID string var statusCode int err := wait.ExponentialBackoff(c.backoff, func() (done bool, err error) { - requestID, statusCode, err = c.client.SendAndGetID(ctx, c.configurator.Config().DataReporting.UploadEndpoint, *s) + requestID, statusCode, err = c.client.SendAndGetID(ctx, c.secretConfigurator.Config().Endpoint, *s) if err != nil { // do no return the error if it's not the last attempt if c.backoff.Steps > 1 {