Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug 1954640: Support of gatherers with different periods #399

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
312 changes: 177 additions & 135 deletions docs/insights-archive-sample/insights-operator/gathers.json

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,16 @@ type Controller struct {
ReportMinRetryTime time.Duration
ReportPullingTimeout time.Duration
Impersonate string
Gather []string
// list of gathering functions to enable
// if there's a string "ALL", we enable everything
// otherwise, each string should consist of 2 parts:
// gatherer name and function name split by slash
// Example: []{
// "clusterconfig/container_images",
// "clusterconfig/nodes",
// "clusterconfig/authentication",
// }
Gather []string
// EnableGlobalObfuscation enables obfuscation of domain names and IP addresses
// To see the detailed info about how anonymization works, go to the docs of package anonymization.
EnableGlobalObfuscation bool
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/configobserver/configobserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ type ConfigReporter interface {
SetConfig(*config.Controller)
}

type Configurator interface {
Config() *config.Controller
ConfigChanged() (<-chan struct{}, func())
}

// Responsible for periodically checking and (if necessary) updating the local configs/tokens
// according to the configs/tokens present on the cluster.
type Controller struct {
Expand Down
13 changes: 13 additions & 0 deletions pkg/config/mock_configurator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package config

// MockConfigurator returns the config from conf field
type MockConfigurator struct {
Conf *Controller
}

func (mc *MockConfigurator) Config() *Controller {
return mc.Conf
}
func (mc *MockConfigurator) ConfigChanged() (<-chan struct{}, func()) {
return nil, nil
}
25 changes: 15 additions & 10 deletions pkg/controller/gather_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/openshift/insights-operator/pkg/anonymization"
"github.com/openshift/insights-operator/pkg/config"
"github.com/openshift/insights-operator/pkg/config/configobserver"
"github.com/openshift/insights-operator/pkg/gather/clusterconfig"
"github.com/openshift/insights-operator/pkg/gather"
"github.com/openshift/insights-operator/pkg/recorder"
"github.com/openshift/insights-operator/pkg/recorder/diskrecorder"
)
Expand Down Expand Up @@ -69,8 +69,7 @@ func (d *GatherJob) Gather(ctx context.Context, kubeConfig, protoKubeConfig *res

var anonymizer *anonymization.Anonymizer
if anonymization.IsObfuscationEnabled(configObserver) {
var configClient *configv1client.ConfigV1Client
configClient, err = configv1client.NewForConfig(kubeConfig)
configClient, err := configv1client.NewForConfig(kubeConfig)
if err != nil {
return err
}
Expand All @@ -86,12 +85,18 @@ func (d *GatherJob) Gather(ctx context.Context, kubeConfig, protoKubeConfig *res
rec := recorder.New(recdriver, d.Interval, anonymizer)
defer rec.Flush()

// the gatherers check the state of the cluster and report any
// config to the recorder
clusterConfigGatherer := clusterconfig.New(gatherKubeConfig, gatherProtoKubeConfig, metricsGatherKubeConfig, anonymizer)
err = clusterConfigGatherer.Gather(ctx, configObserver.Config().Gather, rec)
if err != nil {
return err
gatherers := gather.CreateAllGatherers(
gatherKubeConfig, gatherProtoKubeConfig, metricsGatherKubeConfig, anonymizer,
)

var allFunctionReports []gather.GathererFunctionReport
for _, gatherer := range gatherers {
functionReports, err := gather.CollectAndRecordGatherer(ctx, gatherer, rec, configObserver)
if err != nil {
return err
}
allFunctionReports = append(allFunctionReports, functionReports...)
}
return nil

return gather.RecordArchiveMetadata(allFunctionReports, rec, anonymizer)
}
14 changes: 6 additions & 8 deletions pkg/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/openshift/insights-operator/pkg/controller/periodic"
"github.com/openshift/insights-operator/pkg/controller/status"
"github.com/openshift/insights-operator/pkg/gather"
"github.com/openshift/insights-operator/pkg/gather/clusterconfig"
"github.com/openshift/insights-operator/pkg/insights/insightsclient"
"github.com/openshift/insights-operator/pkg/insights/insightsreport"
"github.com/openshift/insights-operator/pkg/insights/insightsuploader"
Expand Down Expand Up @@ -120,14 +119,12 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller
rec := recorder.New(recdriver, s.Interval, anonymizer)
go rec.PeriodicallyPrune(ctx, statusReporter)

// the gatherers periodically check the state of the cluster and report any
// config to the recorder
clusterConfigGatherer := clusterconfig.New(
// the gatherers are periodically called to collect the data from the cluster
// and provide the results for the recorder
gatherers := gather.CreateAllGatherers(
gatherKubeConfig, gatherProtoKubeConfig, metricsGatherKubeConfig, anonymizer,
)
periodicGather := periodic.New(configObserver, rec, map[string]gather.Interface{
"clusterconfig": clusterConfigGatherer,
})
periodicGather := periodic.New(configObserver, rec, gatherers, anonymizer)
statusReporter.AddSources(periodicGather.Sources()...)

// check we can read IO container status and we are not in crash loop
Expand Down Expand Up @@ -168,9 +165,10 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller
reportGatherer := insightsreport.New(insightsClient, configObserver, uploader)
go reportGatherer.Run(ctx)

klog.Warning("stopped")
klog.Warning("started")

<-ctx.Done()

return nil
}

Expand Down
94 changes: 60 additions & 34 deletions pkg/controller/periodic/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,51 @@ import (
"sort"
"time"

"k8s.io/klog/v2"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

"github.com/openshift/insights-operator/pkg/config"
"github.com/openshift/insights-operator/pkg/anonymization"
"github.com/openshift/insights-operator/pkg/config/configobserver"
"github.com/openshift/insights-operator/pkg/controller/status"
"github.com/openshift/insights-operator/pkg/controllerstatus"
"github.com/openshift/insights-operator/pkg/gather"
"github.com/openshift/insights-operator/pkg/gatherers"
"github.com/openshift/insights-operator/pkg/recorder"
)

type Configurator interface {
Config() *config.Controller
ConfigChanged() (<-chan struct{}, func())
}

// Controller periodically runs gatherers, records their results to the recorder
// and flushes the recorder to create archives
type Controller struct {
configurator Configurator
configurator configobserver.Configurator
recorder recorder.FlushInterface
gatherers map[string]gather.Interface
gatherers []gatherers.Interface
statuses map[string]*controllerstatus.Simple
anonymizer *anonymization.Anonymizer
}

func New(configurator Configurator, rec recorder.FlushInterface, gatherers map[string]gather.Interface) *Controller {
// New creates a new instance of Controller which periodically invokes the gatherers
// and flushes the recorder to create archives.
func New(
configurator configobserver.Configurator,
rec recorder.FlushInterface,
gatherers []gatherers.Interface,
anonymizer *anonymization.Anonymizer,
) *Controller {
statuses := make(map[string]*controllerstatus.Simple)
for k := range gatherers {
statuses[k] = &controllerstatus.Simple{Name: fmt.Sprintf("periodic-%s", k)}

for _, gatherer := range gatherers {
gathererName := gatherer.GetName()
statuses[gathererName] = &controllerstatus.Simple{Name: fmt.Sprintf("periodic-%s", gathererName)}
}
c := &Controller{

return &Controller{
configurator: configurator,
recorder: rec,
gatherers: gatherers,
statuses: statuses,
anonymizer: anonymizer,
}
return c
}

func (c *Controller) Sources() []controllerstatus.Interface {
Expand Down Expand Up @@ -98,15 +107,46 @@ func (c *Controller) Gather() {
Steps: threshold,
Cap: interval,
}
for name := range c.gatherers {

// flush when all necessary gatherers were processed
defer func() {
if err := c.recorder.Flush(); err != nil {
klog.Errorf("Unable to flush the recorder: %v", err)
}
}()

var gatherersToProcess []gatherers.Interface

for _, gatherer := range c.gatherers {
if g, ok := gatherer.(gatherers.CustomPeriodGatherer); ok {
if g.ShouldBeProcessedNow() {
gatherersToProcess = append(gatherersToProcess, g)
g.UpdateLastProcessingTime()
}
} else {
gatherersToProcess = append(gatherersToProcess, gatherer)
}
}

var allFunctionReports []gather.GathererFunctionReport

for _, gatherer := range gatherersToProcess {
_ = wait.ExponentialBackoff(backoff, func() (bool, error) {
name := gatherer.GetName()
start := time.Now()
err := c.runGatherer(name)

ctx, cancel := context.WithTimeout(context.Background(), c.configurator.Config().Interval/2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lol... so I was experimenting with setting some short timeout here (e.g 10s) and the log kinda exploded....especially due to two following gatherers:

  • olm_operators
  • service_accounts

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was experimenting with this again and maybe I would remove the extra logging statement in https://github.com/openshift/insights-operator/pull/399/files#diff-bdd45c95687562d23e5fdd43dea644d8fb86c95354192cedd72835743ba9fc0fR104. WDYT? I think the error should be obvious from a particular gatherer function and we shouldn't log it again....

I noticed it's almost the same now in https://github.com/openshift/insights-operator/blob/master/pkg/gather/clusterconfig/0_gatherer.go#L216, but the verbosity level is 5.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But where else do we log it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The particular gatherer should log its own error IMO and it should also be visible in the metadata.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if we forget to log the error in the gatherer we're only gonna get it in the metadata

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, what I am trying to say is that it should be gatherer responsibility (i.e. we shouldn't forget). The metadata is most important for me, because we usually don't have the IO log in our archive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, from my point of view it's better to do it once in the calling code than many times in gatherers. But I agree that metadata is more important than logs for us.

defer cancel()

klog.V(4).Infof("Running %s gatherer", gatherer.GetName())
functionReports, err := gather.CollectAndRecordGatherer(ctx, gatherer, c.recorder, c.configurator)
allFunctionReports = append(allFunctionReports, functionReports...)
if err == nil {
klog.V(3).Infof("Periodic gather %s completed in %s", name, time.Since(start).Truncate(time.Millisecond))
c.statuses[name].UpdateStatus(controllerstatus.Summary{Healthy: true})
return true, nil
}

utilruntime.HandleError(fmt.Errorf("%v failed after %s with: %v", name, time.Since(start).Truncate(time.Millisecond), err))
c.statuses[name].UpdateStatus(
controllerstatus.Summary{
Expand All @@ -117,25 +157,11 @@ func (c *Controller) Gather() {
return false, nil
})
}
}

// Does the prep for running a gatherer then calls gatherer.Gather. (getting the context, cleaning the recorder)
func (c *Controller) runGatherer(name string) error {
gatherer, ok := c.gatherers[name]
if !ok {
klog.V(2).Infof("No such gatherer %s", name)
return nil
err := gather.RecordArchiveMetadata(allFunctionReports, c.recorder, c.anonymizer)
if err != nil {
klog.Errorf("unable to record archive metadata because of error: %v", err)
}
timeoutDuration := c.configurator.Config().Interval / 2
ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration)
defer cancel()
defer func() {
if err := c.recorder.Flush(); err != nil {
klog.Errorf("Unable to flush recorder: %v", err)
}
}()
klog.V(4).Infof("Running %s", name)
return gatherer.Gather(ctx, c.configurator.Config().Gather, c.recorder)
}

// Periodically starts the gathering.
Expand Down
76 changes: 76 additions & 0 deletions pkg/controller/periodic/periodic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package periodic

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/openshift/insights-operator/pkg/config"
"github.com/openshift/insights-operator/pkg/gather"
"github.com/openshift/insights-operator/pkg/gatherers"
"github.com/openshift/insights-operator/pkg/recorder"
)

func Test_Controller_CustomPeriodGatherer(t *testing.T) {
c, mockRecorder := getMocksForPeriodicTest([]gatherers.Interface{
&gather.MockGatherer{CanFail: true},
&gather.MockCustomPeriodGatherer{Period: 999 * time.Hour},
})

c.Gather()
// 6 gatherers + metadata
assert.Len(t, mockRecorder.Records, 7)
mockRecorder.Reset()

c.Gather()
// 5 gatherers + metadata (one is low priority and we need to wait 999 hours to get it
assert.Len(t, mockRecorder.Records, 6)
mockRecorder.Reset()
}

func Test_Controller_CustomPeriodGathererNoPeriod(t *testing.T) {
mockGatherer := gather.MockCustomPeriodGathererNoPeriod{ShouldBeProcessed: true}
c, mockRecorder := getMocksForPeriodicTest([]gatherers.Interface{
&gather.MockGatherer{CanFail: true},
&mockGatherer,
})

c.Gather()
// 6 gatherers + metadata
assert.Len(t, mockRecorder.Records, 7)
assert.Equal(t, 1, mockGatherer.ShouldBeProcessedNowWasCalledNTimes)
assert.Equal(t, 1, mockGatherer.UpdateLastProcessingTimeWasCalledNTimes)
mockRecorder.Reset()

mockGatherer.ShouldBeProcessed = false

c.Gather()
// 5 gatherers + metadata (we've just disabled one gatherer)
assert.Len(t, mockRecorder.Records, 6)
assert.Equal(t, 2, mockGatherer.ShouldBeProcessedNowWasCalledNTimes)
// ShouldBeProcessedNow had returned false so we didn't call UpdateLastProcessingTime
assert.Equal(t, 1, mockGatherer.UpdateLastProcessingTimeWasCalledNTimes)
mockRecorder.Reset()

mockGatherer.ShouldBeProcessed = true

c.Gather()
assert.Len(t, mockRecorder.Records, 7)
assert.Equal(t, 3, mockGatherer.ShouldBeProcessedNowWasCalledNTimes)
assert.Equal(t, 2, mockGatherer.UpdateLastProcessingTimeWasCalledNTimes)
mockRecorder.Reset()
}

// TODO: cover more things

func getMocksForPeriodicTest(gatherers []gatherers.Interface) (*Controller, *recorder.MockRecorder) {
mockConfigurator := config.MockConfigurator{Conf: &config.Controller{
Report: true,
Interval: time.Hour,
Gather: []string{gather.AllGatherersConst},
}}
mockRecorder := recorder.MockRecorder{}

return New(&mockConfigurator, &mockRecorder, gatherers, nil), &mockRecorder
}
Loading