Skip to content

Commit

Permalink
next
Browse files Browse the repository at this point in the history
  • Loading branch information
tremes committed Jun 9, 2023
1 parent 39e6f01 commit 4601d89
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 36 deletions.
8 changes: 4 additions & 4 deletions pkg/anonymization/anonymizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type Anonymizer struct {
ipNetworkRegex *regexp.Regexp
secretsClient corev1client.SecretInterface
secretConfigurator configobserver.Configurator
apiConfigurator configobserver.APIConfigObserver
apiConfigurator configobserver.InsightsDataGatherObserver
configClient configv1client.ConfigV1Interface
networkClient networkv1client.NetworkV1Interface
gatherKubeClient kubernetes.Interface
Expand All @@ -104,7 +104,7 @@ func NewAnonymizer(clusterBaseDomain string,
networks []string,
secretsClient corev1client.SecretInterface,
secretConfigurator configobserver.Configurator,
apiConfigurator configobserver.APIConfigObserver) (*Anonymizer, error) {
apiConfigurator configobserver.InsightsDataGatherObserver) (*Anonymizer, error) {
cidrs, err := k8snet.ParseCIDRs(networks)
if err != nil {
return nil, err
Expand Down Expand Up @@ -138,7 +138,7 @@ func NewAnonymizerFromConfigClient(
configClient configv1client.ConfigV1Interface,
networkClient networkv1client.NetworkV1Interface,
secretConfigurator configobserver.Configurator,
apiConfigurator configobserver.APIConfigObserver,
apiConfigurator configobserver.InsightsDataGatherObserver,
) (*Anonymizer, error) {
baseDomain, err := utils.GetClusterBaseDomain(ctx, configClient)
if err != nil {
Expand Down Expand Up @@ -322,7 +322,7 @@ func NewAnonymizerFromConfig(
gatherProtoKubeConfig *rest.Config,
protoKubeConfig *rest.Config,
secretConfigurator configobserver.Configurator,
apiConfigurator configobserver.APIConfigObserver,
apiConfigurator configobserver.InsightsDataGatherObserver,
) (*Anonymizer, error) {
kubeClient, err := kubernetes.NewForConfig(protoKubeConfig)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,32 @@ import (
"k8s.io/klog/v2"
)

type APIConfigObserver interface {
type InsightsDataGatherObserver interface {
factory.Controller
RunWrapper(context.Context, int, bool)
GatherConfig() *v1alpha1.GatherConfig
GatherDataPolicy() *v1alpha1.DataPolicy
GatherDisabled() bool
}

type APIConfigController struct {
type InsightsDataGatherController struct {
factory.Controller
lock sync.Mutex
listeners map[chan *v1alpha1.GatherConfig]struct{}
configV1Alpha1Cli *configCliv1alpha1.ConfigV1alpha1Client
gatherConfig *v1alpha1.GatherConfig
run bool
}

func NewAPIConfigObserver(kubeConfig *rest.Config,
func NewInsightsDataGatherObserver(kubeConfig *rest.Config,
eventRecorder events.Recorder,
configInformer configinformers.SharedInformerFactory) (APIConfigObserver, error) {
configInformer configinformers.SharedInformerFactory) (InsightsDataGatherObserver, error) {
inf := configInformer.Config().V1alpha1().InsightsDataGathers().Informer()
configV1Alpha1Cli, err := configCliv1alpha1.NewForConfig(kubeConfig)
if err != nil {
return nil, err
}
c := &APIConfigController{
c := &InsightsDataGatherController{
configV1Alpha1Cli: configV1Alpha1Cli,
listeners: make(map[chan *v1alpha1.GatherConfig]struct{}),
}
Expand All @@ -56,36 +58,54 @@ func NewAPIConfigObserver(kubeConfig *rest.Config,
return c, nil
}

func (a *APIConfigController) sync(ctx context.Context, _ factory.SyncContext) error {
insightDataGatherConf, err := a.configV1Alpha1Cli.InsightsDataGathers().Get(ctx, "cluster", metav1.GetOptions{})
// RunWrapper is a helper (hack) function to run the controller only when the corresponding feature
// is enabled. Otherwise we want to shutdown the queue and exit the sync loop. This can be removed once the
// InsightsDataGather API is promoted.
func (i *InsightsDataGatherController) RunWrapper(ctx context.Context, workers int, enabled bool) {
if enabled {
i.Controller.Run(ctx, workers)
i.run = true
}
}

func (i *InsightsDataGatherController) sync(ctx context.Context, sctx factory.SyncContext) error {
// shutdown the queue if the informer (and its cache) doesn't need to sync (run)
if !i.run {
klog.Infof("Shutting down the queue for the %s", i.Name())
sctx.Queue().ShutDown()
return nil
}

insightDataGatherConf, err := i.configV1Alpha1Cli.InsightsDataGathers().Get(ctx, "cluster", metav1.GetOptions{})
if err != nil {
return err
}
a.gatherConfig = &insightDataGatherConf.Spec.GatherConfig
i.gatherConfig = &insightDataGatherConf.Spec.GatherConfig

return nil
}

// GatherConfig provides the complete gather config in a thread-safe way.
func (a *APIConfigController) GatherConfig() *v1alpha1.GatherConfig {
a.lock.Lock()
defer a.lock.Unlock()
return a.gatherConfig
func (i *InsightsDataGatherController) GatherConfig() *v1alpha1.GatherConfig {
i.lock.Lock()
defer i.lock.Unlock()
return i.gatherConfig
}

// GatherDisabled tells whether data gathering is disabled or not
func (a *APIConfigController) GatherDisabled() bool {
a.lock.Lock()
defer a.lock.Unlock()
func (i *InsightsDataGatherController) GatherDisabled() bool {
i.lock.Lock()
defer i.lock.Unlock()

if utils.StringInSlice("all", a.gatherConfig.DisabledGatherers) ||
utils.StringInSlice("ALL", a.gatherConfig.DisabledGatherers) {
if utils.StringInSlice("all", i.gatherConfig.DisabledGatherers) ||
utils.StringInSlice("ALL", i.gatherConfig.DisabledGatherers) {
return true
}
return false
}

// GatherDataPolicy provides DataPolicy attribute value defined in the API
func (a *APIConfigController) GatherDataPolicy() *v1alpha1.DataPolicy {
func (a *InsightsDataGatherController) GatherDataPolicy() *v1alpha1.DataPolicy {
a.lock.Lock()
defer a.lock.Unlock()
return &a.gatherConfig.DataPolicy
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/mock_configurator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package config

import (
"context"

"github.com/openshift/api/config/v1alpha1"
"github.com/openshift/insights-operator/pkg/utils"
"github.com/openshift/library-go/pkg/controller/factory"
Expand Down Expand Up @@ -46,6 +48,10 @@ func (mc *MockAPIConfigurator) GatherConfig() *v1alpha1.GatherConfig {
return mc.config
}

func (mc *MockAPIConfigurator) RunWrapper(ctx context.Context, _ int, enabled bool) {
// no-op
}

func (mc *MockAPIConfigurator) GatherDisabled() bool {
if mc.config != nil {
if utils.StringInSlice("all", mc.config.DisabledGatherers) ||
Expand Down
15 changes: 7 additions & 8 deletions pkg/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller
if envVersion, exists := os.LookupEnv("RELEASE_VERSION"); exists {
desiredVersion = envVersion
}
apiConfigObserver, err := configobserver.NewAPIConfigObserver(gatherKubeConfig, controller.EventRecorder, configInformers)
insightsDataGatherObserver, err := configobserver.NewInsightsDataGatherObserver(gatherKubeConfig, controller.EventRecorder, configInformers)
if err != nil {
return err
}
Expand Down Expand Up @@ -124,21 +124,20 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller
return fmt.Errorf("can't create --path: %v", err)
}
}
if insightsConfigAPIEnabled {
go apiConfigObserver.Run(ctx, 1)
}

go insightsDataGatherObserver.RunWrapper(ctx, 1, insightsConfigAPIEnabled)

// secretConfigObserver synthesizes all config into the status reporter controller
secretConfigObserver := configobserver.New(s.Controller, kubeClient)
go secretConfigObserver.Start(ctx)

// the status controller initializes the cluster operator object and retrieves
// the last sync time, if any was set
statusReporter := status.NewController(configClient.ConfigV1(), secretConfigObserver, apiConfigObserver, os.Getenv("POD_NAMESPACE"))
statusReporter := status.NewController(configClient.ConfigV1(), secretConfigObserver, insightsDataGatherObserver, os.Getenv("POD_NAMESPACE"))

// anonymizer is responsible for anonymizing sensitive data, it can be configured to disable specific anonymization
anonymizer, err := anonymization.NewAnonymizerFromConfig(ctx, gatherKubeConfig,
gatherProtoKubeConfig, controller.ProtoKubeConfig, secretConfigObserver, apiConfigObserver)
gatherProtoKubeConfig, controller.ProtoKubeConfig, secretConfigObserver, insightsDataGatherObserver)
if err != nil {
// in case of an error anonymizer will be nil and anonymization will be just skipped
klog.Errorf(anonymization.UnableToCreateAnonymizerErrorMessage, err)
Expand Down Expand Up @@ -169,7 +168,7 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller
gatherKubeConfig, gatherProtoKubeConfig, metricsGatherKubeConfig, alertsGatherKubeConfig, anonymizer,
secretConfigObserver, insightsClient,
)
periodicGather := periodic.New(secretConfigObserver, rec, gatherers, anonymizer, operatorClient.InsightsOperators(), apiConfigObserver)
periodicGather := periodic.New(secretConfigObserver, rec, gatherers, anonymizer, operatorClient.InsightsOperators(), insightsDataGatherObserver)
statusReporter.AddSources(periodicGather.Sources()...)

// check we can read IO container status and we are not in crash loop
Expand All @@ -185,7 +184,7 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller

// upload results to the provided client - if no client is configured reporting
// is permanently disabled, but if a client does exist the server may still disable reporting
uploader := insightsuploader.New(recdriver, insightsClient, secretConfigObserver, apiConfigObserver, statusReporter, initialDelay)
uploader := insightsuploader.New(recdriver, insightsClient, secretConfigObserver, insightsDataGatherObserver, statusReporter, initialDelay)
statusReporter.AddSources(uploader)

// start reporting status now that all controller loops are added as sources
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/periodic/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (
// and flushes the recorder to create archives
type Controller struct {
secretConfigurator configobserver.Configurator
apiConfigurator configobserver.APIConfigObserver
apiConfigurator configobserver.InsightsDataGatherObserver
recorder recorder.FlushInterface
gatherers []gatherers.Interface
statuses map[string]controllerstatus.StatusController
Expand All @@ -56,7 +56,7 @@ func New(
listGatherers []gatherers.Interface,
anonymizer *anonymization.Anonymizer,
insightsOperatorCLI operatorv1client.InsightsOperatorInterface,
apiConfigurator configobserver.APIConfigObserver,
apiConfigurator configobserver.InsightsDataGatherObserver,
) *Controller {
statuses := make(map[string]controllerstatus.StatusController)

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/status/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Controller struct {

statusCh chan struct{}
secretConfigurator configobserver.Configurator
apiConfigurator configobserver.APIConfigObserver
apiConfigurator configobserver.InsightsDataGatherObserver

sources map[string]controllerstatus.StatusController
reported Reported
Expand All @@ -70,7 +70,7 @@ type Controller struct {
// NewController creates a statusMessage controller, responsible for monitoring the operators statusMessage and updating its cluster statusMessage accordingly.
func NewController(client configv1client.ConfigV1Interface,
secretConfigurator configobserver.Configurator,
apiConfigurator configobserver.APIConfigObserver,
apiConfigurator configobserver.InsightsDataGatherObserver,
namespace string) *Controller {
c := &Controller{
name: "insights",
Expand Down
4 changes: 2 additions & 2 deletions pkg/insights/insightsuploader/insightsuploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Controller struct {
summarizer Summarizer
client *insightsclient.Client
secretConfigurator configobserver.Configurator
apiConfigurator configobserver.APIConfigObserver
apiConfigurator configobserver.InsightsDataGatherObserver
reporter StatusReporter
archiveUploaded chan struct{}
initialDelay time.Duration
Expand All @@ -45,7 +45,7 @@ type Controller struct {
func New(summarizer Summarizer,
client *insightsclient.Client,
secretconfigurator configobserver.Configurator,
apiConfigurator configobserver.APIConfigObserver,
apiConfigurator configobserver.InsightsDataGatherObserver,
statusReporter StatusReporter,
initialDelay time.Duration) *Controller {

Expand Down

0 comments on commit 4601d89

Please sign in to comment.