Skip to content

Commit

Permalink
on-demand data gathering (#815)
Browse files Browse the repository at this point in the history
  • Loading branch information
tremes authored Aug 10, 2023
1 parent 4efc924 commit d64d834
Show file tree
Hide file tree
Showing 17 changed files with 762 additions and 58 deletions.
12 changes: 2 additions & 10 deletions pkg/controller/gather_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (g *GatherJob) Gather(ctx context.Context, kubeConfig, protoKubeConfig *res
}
}

return gather.RecordArchiveMetadata(mapToArray(allFunctionReports), rec, anonymizer)
return gather.RecordArchiveMetadata(gather.FunctionReportsMapToArray(allFunctionReports), rec, anonymizer)
}

// GatherAndUpload runs a single gather and stores the generated archive, uploads it.
Expand Down Expand Up @@ -191,7 +191,7 @@ func (g *GatherJob) GatherAndUpload(kubeConfig, protoKubeConfig *rest.Config) er

// record data
dataRecordedCon := status.DataRecordedCondition(metav1.ConditionTrue, "AsExpected", "")
lastArchive, err := record(mapToArray(allFunctionReports), rec, recdriver, anonymizer)
lastArchive, err := record(gather.FunctionReportsMapToArray(allFunctionReports), rec, recdriver, anonymizer)
if err != nil {
klog.Error("Failed to record data archive: %v", err)
dataRecordedCon.Status = metav1.ConditionFalse
Expand Down Expand Up @@ -287,14 +287,6 @@ func updateDataGatherStatus(ctx context.Context, insightsClient insightsv1alpha1
}
}

func mapToArray(m map[string]gather.GathererFunctionReport) []gather.GathererFunctionReport {
a := make([]gather.GathererFunctionReport, 0, len(m))
for _, v := range m {
a = append(a, v)
}
return a
}

// record is a helper function recording the archive metadata as well as data.
// Returns last known Insights archive and an error when recording failed.
func record(functionReports []gather.GathererFunctionReport,
Expand Down
16 changes: 13 additions & 3 deletions pkg/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
v1 "github.com/openshift/api/config/v1"
configv1client "github.com/openshift/client-go/config/clientset/versioned"
configv1informers "github.com/openshift/client-go/config/informers/externalversions"
insightsv1alpha1cli "github.com/openshift/client-go/insights/clientset/versioned/typed/insights/v1alpha1"
insightsv1alpha1client "github.com/openshift/client-go/insights/clientset/versioned"
insightsInformers "github.com/openshift/client-go/insights/informers/externalversions"
operatorv1client "github.com/openshift/client-go/operator/clientset/versioned/typed/operator/v1"
"github.com/openshift/library-go/pkg/controller/controllercmd"
"github.com/openshift/library-go/pkg/operator/configobserver/featuregates"
Expand Down Expand Up @@ -75,7 +76,7 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller
return err
}

insightClient, err := insightsv1alpha1cli.NewForConfig(controller.KubeConfig)
insightClient, err := insightsv1alpha1client.NewForConfig(controller.KubeConfig)
if err != nil {
return err
}
Expand Down Expand Up @@ -128,6 +129,7 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller
}
}
var insightsDataGatherObserver configobserver.InsightsDataGatherObserver
var dgInformer periodic.DataGatherInformer
if insightsConfigAPIEnabled {
configInformersForTechPreview := configv1informers.NewSharedInformerFactory(configClient, 10*time.Minute)
insightsDataGatherObserver, err = configobserver.NewInsightsDataGatherObserver(gatherKubeConfig,
Expand All @@ -136,8 +138,16 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller
return err
}

insightsInformersfactory := insightsInformers.NewSharedInformerFactory(insightClient, 10*time.Minute)
dgInformer, err = periodic.NewDataGatherInformer(controller.EventRecorder, insightsInformersfactory)
if err != nil {
return err
}

go insightsDataGatherObserver.Run(ctx, 1)
go configInformersForTechPreview.Start(ctx.Done())
go dgInformer.Run(ctx, 1)
go insightsInformersfactory.Start(ctx.Done())
}

// secretConfigObserver synthesizes all config into the status reporter controller
Expand Down Expand Up @@ -196,7 +206,7 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller
} else {
reportRetriever := insightsreport.NewWithTechPreview(insightsClient, secretConfigObserver)
periodicGather = periodic.NewWithTechPreview(reportRetriever, secretConfigObserver,
insightsDataGatherObserver, gatherers, kubeClient, insightClient, operatorClient.InsightsOperators())
insightsDataGatherObserver, gatherers, kubeClient, insightClient.InsightsV1alpha1(), operatorClient.InsightsOperators(), dgInformer)
statusReporter.AddSources(periodicGather.Sources()...)
statusReporter.AddSources(reportRetriever)
go periodicGather.PeriodicPrune(ctx)
Expand Down
80 changes: 80 additions & 0 deletions pkg/controller/periodic/datagather_informer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package periodic

import (
"context"
"strings"

insightsInformers "github.com/openshift/client-go/insights/informers/externalversions"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)

const (
periodicGatheringPrefix = "periodic-gathering-"
)

// DataGatherInformer is an interface providing information
// about newly create DataGather resources
type DataGatherInformer interface {
factory.Controller
DataGatherCreated() <-chan string
}

// dataGatherController is type implementing DataGatherInformer
type dataGatherController struct {
factory.Controller
ch chan string
}

// NewDataGatherInformer creates a new instance of the DataGatherInformer interface
func NewDataGatherInformer(eventRecorder events.Recorder, insightsInf insightsInformers.SharedInformerFactory) (DataGatherInformer, error) {
inf := insightsInf.Insights().V1alpha1().DataGathers().Informer()

dgCtrl := &dataGatherController{
ch: make(chan string),
}
_, err := inf.AddEventHandler(dgCtrl.eventHandler())
if err != nil {
return nil, err
}

ctrl := factory.New().WithInformers(inf).
WithSync(dgCtrl.sync).
ToController("DataGatherInformer", eventRecorder)

dgCtrl.Controller = ctrl
return dgCtrl, nil
}

func (d *dataGatherController) sync(_ context.Context, _ factory.SyncContext) error {
return nil
}

// eventHandler returns a new ResourceEventHandler that handles the DataGather resources
// addition events. Resources with the prefix "periodic-gathering-" are filtered out to avoid conflicts
// with periodic data gathering.
func (d *dataGatherController) eventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
dgMetadata, err := meta.Accessor(obj)
if err != nil {
klog.Errorf("Can't read metadata of newly added DataGather resource: %v", err)
return
}
// filter out dataGathers created for periodic gathering
if strings.HasPrefix(dgMetadata.GetName(), periodicGatheringPrefix) {
return
}
d.ch <- dgMetadata.GetName()
},
}
}

// DataGatherCreated returns a channel providing the name of
// newly created DataGather resource
func (d *dataGatherController) DataGatherCreated() <-chan string {
return d.ch
}
109 changes: 82 additions & 27 deletions pkg/controller/periodic/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Controller struct {
jobController *JobController
pruneInterval time.Duration
techPreview bool
dgInf DataGatherInformer
}

func NewWithTechPreview(
Expand All @@ -68,6 +69,8 @@ func NewWithTechPreview(
kubeClient kubernetes.Interface,
dataGatherClient insightsv1alpha1cli.InsightsV1alpha1Interface,
insightsOperatorCLI operatorv1client.InsightsOperatorInterface,
dgInf DataGatherInformer,

) *Controller {
statuses := make(map[string]controllerstatus.StatusController)

Expand All @@ -85,6 +88,7 @@ func NewWithTechPreview(
insightsOperatorCLI: insightsOperatorCLI,
pruneInterval: 1 * time.Hour,
techPreview: true,
dgInf: dgInf,
}
}

Expand Down Expand Up @@ -153,7 +157,11 @@ func (c *Controller) Run(stopCh <-chan struct{}, initialDelay time.Duration) {
}
}

go wait.Until(func() { c.periodicTrigger(stopCh) }, time.Second, stopCh)
if c.techPreview {
go wait.Until(func() { c.periodicTriggerTechPreview(stopCh) }, time.Second, stopCh)
} else {
go wait.Until(func() { c.periodicTrigger(stopCh) }, time.Second, stopCh)
}

<-stopCh
}
Expand Down Expand Up @@ -218,7 +226,7 @@ func (c *Controller) Gather() {
if err != nil {
klog.Errorf("failed to update the Insights Operator CR status: %v", err)
}
err = gather.RecordArchiveMetadata(mapToArray(allFunctionReports), c.recorder, c.anonymizer)
err = gather.RecordArchiveMetadata(gather.FunctionReportsMapToArray(allFunctionReports), c.recorder, c.anonymizer)
if err != nil {
klog.Errorf("unable to record archive metadata because of error: %v", err)
}
Expand Down Expand Up @@ -246,16 +254,53 @@ func (c *Controller) periodicTrigger(stopCh <-chan struct{}) {
klog.Infof("Gathering cluster info every %s", interval)

case <-time.After(interval):
if c.techPreview {
c.GatherJob()
} else {
c.Gather()
c.Gather()
}
}
}

// periodicTriggerTechPreview is a techpreview alternative to the same function above,
// but this adds a listerner for the dataGatherInforme, which is nil (not initialized) in
// non-techpreview clusters.
func (c *Controller) periodicTriggerTechPreview(stopCh <-chan struct{}) {
configCh, closeFn := c.secretConfigurator.ConfigChanged()
defer closeFn()

ctx, cancel := context.WithTimeout(context.Background(), c.secretConfigurator.Config().Interval*4)
defer cancel()

interval := c.secretConfigurator.Config().Interval
klog.Infof("Gathering cluster info every %s", interval)
for {
select {
case <-stopCh:
return

case <-configCh:
newInterval := c.secretConfigurator.Config().Interval
if newInterval == interval {
continue
}
interval = newInterval
klog.Infof("Gathering cluster info every %s", interval)

case <-time.After(interval):
c.GatherJob()

// lister to on-demand dataGather creations
case dgName := <-c.dgInf.DataGatherCreated():
err := c.updateNewDataGatherCRStatus(ctx, dgName)
if err != nil {
klog.Errorf("Failed to update status of the %s DataGather resource: %v", dgName, err)
return
}
klog.Infof("Starting on-demand data gathering for the %s DataGather resource", dgName)
go c.runJobAndCheckResults(ctx, dgName)
}
}
}

func (c *Controller) GatherJob() { // nolint: funlen, gocyclo
func (c *Controller) GatherJob() {
if c.isGatheringDisabled() {
klog.V(3).Info("Gather is disabled by configuration.")
return
Expand All @@ -279,9 +324,17 @@ func (c *Controller) GatherJob() { // nolint: funlen, gocyclo
klog.Errorf("Failed to create a new DataGather resource: %v", err)
return
}
err = c.updateNewDataGatherCRStatus(ctx, dataGatherCR.GetName())
if err != nil {
klog.Errorf("Failed to update status of the %s DataGather resource: %v", dataGatherCR.GetName(), err)
return
}
c.runJobAndCheckResults(ctx, dataGatherCR.Name)
}

func (c *Controller) runJobAndCheckResults(ctx context.Context, dataGatherName string) {
// create a new periodic gathering job
gj, err := c.jobController.CreateGathererJob(ctx, dataGatherCR.Name, c.image, c.secretConfigurator.Config().StoragePath)
gj, err := c.jobController.CreateGathererJob(ctx, dataGatherName, c.image, c.secretConfigurator.Config().StoragePath)
if err != nil {
klog.Errorf("Failed to create a new job: %v", err)
return
Expand All @@ -297,9 +350,9 @@ func (c *Controller) GatherJob() { // nolint: funlen, gocyclo
klog.Error(err)
}
klog.Infof("Job completed %s", gj.Name)
dataGatherFinished, err := c.dataGatherClient.DataGathers().Get(ctx, dataGatherCR.Name, metav1.GetOptions{})
dataGatherFinished, err := c.dataGatherClient.DataGathers().Get(ctx, dataGatherName, metav1.GetOptions{})
if err != nil {
klog.Errorf("Failed to get DataGather resource %s: %v", dataGatherCR.Name, err)
klog.Errorf("Failed to get DataGather resource %s: %v", dataGatherName, err)
return
}
uploaded := c.wasDataUploaded(dataGatherFinished)
Expand Down Expand Up @@ -500,13 +553,13 @@ func (c *Controller) PeriodicPrune(ctx context.Context) {
}

// createNewDataGatherCR creates a new "datagather.insights.openshift.io" custom resource
// with generate name prefix "periodic-gathering-". Returns the name of the newly created
// resource
// with generate name prefix "periodic-gathering-". Returns the newly created
// resource or an error if the creation failed.
func (c *Controller) createNewDataGatherCR(ctx context.Context, disabledGatherers []string,
dataPolicy insightsv1alpha1.DataPolicy) (*insightsv1alpha1.DataGather, error) {
dataGatherCR := insightsv1alpha1.DataGather{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "periodic-gathering-",
GenerateName: periodicGatheringPrefix,
},
Spec: insightsv1alpha1.DataGatherSpec{
DataPolicy: dataPolicy,
Expand All @@ -522,18 +575,28 @@ func (c *Controller) createNewDataGatherCR(ctx context.Context, disabledGatherer
if err != nil {
return nil, err
}
dataGather.Status.Conditions = []metav1.Condition{
klog.Infof("Created a new %s DataGather custom resource", dataGather.Name)
return dataGather, nil
}

// updateNewDataGatherCRStatus updates the newly created DataGather custom resource status to
// set the initial unknown conditions and also the DataGather state to pending.
func (c *Controller) updateNewDataGatherCRStatus(ctx context.Context, dgName string) error {
dg, err := c.dataGatherClient.DataGathers().Get(ctx, dgName, metav1.GetOptions{})
if err != nil {
return err
}
dg.Status.Conditions = []metav1.Condition{
status.DataUploadedCondition(metav1.ConditionUnknown, status.NoUploadYetReason, ""),
status.DataRecordedCondition(metav1.ConditionUnknown, status.NoDataGatheringYetReason, ""),
status.DataProcessedCondition(metav1.ConditionUnknown, status.NothingToProcessYetReason, ""),
}
dataGather.Status.State = insightsv1alpha1.Pending
dataGather, err = c.dataGatherClient.DataGathers().UpdateStatus(ctx, dataGather, metav1.UpdateOptions{})
dg.Status.State = insightsv1alpha1.Pending
_, err = c.dataGatherClient.DataGathers().UpdateStatus(ctx, dg, metav1.UpdateOptions{})
if err != nil {
return nil, err
return err
}
klog.Infof("Created a new %s DataGather custom resource", dataGather.Name)
return dataGather, nil
return nil
}

// createDataGatherAttributeValues reads the current "insightsdatagather.config.openshift.io" configuration
Expand Down Expand Up @@ -619,11 +682,3 @@ func updateMetrics(dataGather *insightsv1alpha1.DataGather) {
}
insights.IncrementCounterRequestSend(statusCode)
}

func mapToArray(m map[string]gather.GathererFunctionReport) []gather.GathererFunctionReport {
a := make([]gather.GathererFunctionReport, 0, len(m))
for _, v := range m {
a = append(a, v)
}
return a
}
Loading

0 comments on commit d64d834

Please sign in to comment.