Skip to content

Commit

Permalink
run always, but try to shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
tremes committed Jun 9, 2023
1 parent 39e6f01 commit d29ea61
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 38 deletions.
8 changes: 4 additions & 4 deletions pkg/anonymization/anonymizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type Anonymizer struct {
ipNetworkRegex *regexp.Regexp
secretsClient corev1client.SecretInterface
secretConfigurator configobserver.Configurator
apiConfigurator configobserver.APIConfigObserver
apiConfigurator configobserver.InsightsDataGatherObserver
configClient configv1client.ConfigV1Interface
networkClient networkv1client.NetworkV1Interface
gatherKubeClient kubernetes.Interface
Expand All @@ -104,7 +104,7 @@ func NewAnonymizer(clusterBaseDomain string,
networks []string,
secretsClient corev1client.SecretInterface,
secretConfigurator configobserver.Configurator,
apiConfigurator configobserver.APIConfigObserver) (*Anonymizer, error) {
apiConfigurator configobserver.InsightsDataGatherObserver) (*Anonymizer, error) {
cidrs, err := k8snet.ParseCIDRs(networks)
if err != nil {
return nil, err
Expand Down Expand Up @@ -138,7 +138,7 @@ func NewAnonymizerFromConfigClient(
configClient configv1client.ConfigV1Interface,
networkClient networkv1client.NetworkV1Interface,
secretConfigurator configobserver.Configurator,
apiConfigurator configobserver.APIConfigObserver,
apiConfigurator configobserver.InsightsDataGatherObserver,
) (*Anonymizer, error) {
baseDomain, err := utils.GetClusterBaseDomain(ctx, configClient)
if err != nil {
Expand Down Expand Up @@ -322,7 +322,7 @@ func NewAnonymizerFromConfig(
gatherProtoKubeConfig *rest.Config,
protoKubeConfig *rest.Config,
secretConfigurator configobserver.Configurator,
apiConfigurator configobserver.APIConfigObserver,
apiConfigurator configobserver.InsightsDataGatherObserver,
) (*Anonymizer, error) {
kubeClient, err := kubernetes.NewForConfig(protoKubeConfig)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,35 @@ import (
"k8s.io/klog/v2"
)

type APIConfigObserver interface {
type InsightsDataGatherObserver interface {
factory.Controller
GatherConfig() *v1alpha1.GatherConfig
GatherDataPolicy() *v1alpha1.DataPolicy
GatherDisabled() bool
}

type APIConfigController struct {
type insightsDataGatherController struct {
factory.Controller
lock sync.Mutex
listeners map[chan *v1alpha1.GatherConfig]struct{}
configV1Alpha1Cli *configCliv1alpha1.ConfigV1alpha1Client
gatherConfig *v1alpha1.GatherConfig
featureEnabled bool
}

func NewAPIConfigObserver(kubeConfig *rest.Config,
func NewInsightsDataGatherObserver(kubeConfig *rest.Config,
eventRecorder events.Recorder,
configInformer configinformers.SharedInformerFactory) (APIConfigObserver, error) {
configInformer configinformers.SharedInformerFactory,
featureEnabled bool) (InsightsDataGatherObserver, error) {
inf := configInformer.Config().V1alpha1().InsightsDataGathers().Informer()
configV1Alpha1Cli, err := configCliv1alpha1.NewForConfig(kubeConfig)
if err != nil {
return nil, err
}
c := &APIConfigController{
c := &insightsDataGatherController{
configV1Alpha1Cli: configV1Alpha1Cli,
listeners: make(map[chan *v1alpha1.GatherConfig]struct{}),
featureEnabled: featureEnabled,
}

insightDataGatherConf, err := c.configV1Alpha1Cli.InsightsDataGathers().Get(context.Background(), "cluster", metav1.GetOptions{})
Expand All @@ -51,42 +54,48 @@ func NewAPIConfigObserver(kubeConfig *rest.Config,

ctrl := factory.New().WithInformers(inf).
WithSync(c.sync).
ToController("InsightConfigController", eventRecorder)
ToController("InsightsDataGatherObserver", eventRecorder)
c.Controller = ctrl
return c, nil
}

func (a *APIConfigController) sync(ctx context.Context, _ factory.SyncContext) error {
insightDataGatherConf, err := a.configV1Alpha1Cli.InsightsDataGathers().Get(ctx, "cluster", metav1.GetOptions{})
func (i *insightsDataGatherController) sync(ctx context.Context, scx factory.SyncContext) error {
if !i.featureEnabled {
klog.Infof("Shutting down the queue and the event recorder for %s", i.Name())
scx.Queue().ShutDown()
scx.Recorder().Shutdown()
return nil
}
insightDataGatherConf, err := i.configV1Alpha1Cli.InsightsDataGathers().Get(ctx, "cluster", metav1.GetOptions{})
if err != nil {
return err
}
a.gatherConfig = &insightDataGatherConf.Spec.GatherConfig
i.gatherConfig = &insightDataGatherConf.Spec.GatherConfig
return nil
}

// GatherConfig provides the complete gather config in a thread-safe way.
func (a *APIConfigController) GatherConfig() *v1alpha1.GatherConfig {
a.lock.Lock()
defer a.lock.Unlock()
return a.gatherConfig
func (i *insightsDataGatherController) GatherConfig() *v1alpha1.GatherConfig {
i.lock.Lock()
defer i.lock.Unlock()
return i.gatherConfig
}

// GatherDisabled tells whether data gathering is disabled or not
func (a *APIConfigController) GatherDisabled() bool {
a.lock.Lock()
defer a.lock.Unlock()
func (i *insightsDataGatherController) GatherDisabled() bool {
i.lock.Lock()
defer i.lock.Unlock()

if utils.StringInSlice("all", a.gatherConfig.DisabledGatherers) ||
utils.StringInSlice("ALL", a.gatherConfig.DisabledGatherers) {
if utils.StringInSlice("all", i.gatherConfig.DisabledGatherers) ||
utils.StringInSlice("ALL", i.gatherConfig.DisabledGatherers) {
return true
}
return false
}

// GatherDataPolicy provides DataPolicy attribute value defined in the API
func (a *APIConfigController) GatherDataPolicy() *v1alpha1.DataPolicy {
a.lock.Lock()
defer a.lock.Unlock()
return &a.gatherConfig.DataPolicy
func (i *insightsDataGatherController) GatherDataPolicy() *v1alpha1.DataPolicy {
i.lock.Lock()
defer i.lock.Unlock()
return &i.gatherConfig.DataPolicy
}
13 changes: 7 additions & 6 deletions pkg/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,6 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller
if envVersion, exists := os.LookupEnv("RELEASE_VERSION"); exists {
desiredVersion = envVersion
}
apiConfigObserver, err := configobserver.NewAPIConfigObserver(gatherKubeConfig, controller.EventRecorder, configInformers)
if err != nil {
return err
}

// By default, this will exit(0) the process if the featuregates ever change to a different set of values.
featureGateAccessor := featuregates.NewFeatureGateAccess(
Expand Down Expand Up @@ -124,10 +120,15 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller
return fmt.Errorf("can't create --path: %v", err)
}
}
if insightsConfigAPIEnabled {
go apiConfigObserver.Run(ctx, 1)
apiConfigObserver, err := configobserver.NewInsightsDataGatherObserver(gatherKubeConfig,
controller.EventRecorder, configInformers, insightsConfigAPIEnabled)
if err != nil {
return err
}

go apiConfigObserver.Run(ctx, 1)
go configInformers.Start(ctx.Done())

// secretConfigObserver synthesizes all config into the status reporter controller
secretConfigObserver := configobserver.New(s.Controller, kubeClient)
go secretConfigObserver.Start(ctx)
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/periodic/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (
// and flushes the recorder to create archives
type Controller struct {
secretConfigurator configobserver.Configurator
apiConfigurator configobserver.APIConfigObserver
apiConfigurator configobserver.InsightsDataGatherObserver
recorder recorder.FlushInterface
gatherers []gatherers.Interface
statuses map[string]controllerstatus.StatusController
Expand All @@ -56,7 +56,7 @@ func New(
listGatherers []gatherers.Interface,
anonymizer *anonymization.Anonymizer,
insightsOperatorCLI operatorv1client.InsightsOperatorInterface,
apiConfigurator configobserver.APIConfigObserver,
apiConfigurator configobserver.InsightsDataGatherObserver,
) *Controller {
statuses := make(map[string]controllerstatus.StatusController)

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/status/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Controller struct {

statusCh chan struct{}
secretConfigurator configobserver.Configurator
apiConfigurator configobserver.APIConfigObserver
apiConfigurator configobserver.InsightsDataGatherObserver

sources map[string]controllerstatus.StatusController
reported Reported
Expand All @@ -70,7 +70,7 @@ type Controller struct {
// NewController creates a statusMessage controller, responsible for monitoring the operators statusMessage and updating its cluster statusMessage accordingly.
func NewController(client configv1client.ConfigV1Interface,
secretConfigurator configobserver.Configurator,
apiConfigurator configobserver.APIConfigObserver,
apiConfigurator configobserver.InsightsDataGatherObserver,
namespace string) *Controller {
c := &Controller{
name: "insights",
Expand Down
4 changes: 2 additions & 2 deletions pkg/insights/insightsuploader/insightsuploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Controller struct {
summarizer Summarizer
client *insightsclient.Client
secretConfigurator configobserver.Configurator
apiConfigurator configobserver.APIConfigObserver
apiConfigurator configobserver.InsightsDataGatherObserver
reporter StatusReporter
archiveUploaded chan struct{}
initialDelay time.Duration
Expand All @@ -45,7 +45,7 @@ type Controller struct {
func New(summarizer Summarizer,
client *insightsclient.Client,
secretconfigurator configobserver.Configurator,
apiConfigurator configobserver.APIConfigObserver,
apiConfigurator configobserver.InsightsDataGatherObserver,
statusReporter StatusReporter,
initialDelay time.Duration) *Controller {

Expand Down

0 comments on commit d29ea61

Please sign in to comment.