Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug 1948966: One-off gather #389

Merged
merged 6 commits into from
Apr 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions docs/gather-job.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
apiVersion: batch/v1
kind: Job
metadata:
name: insights-operator-job
annotations:
config.openshift.io/inject-proxy: insights-operator
spec:
backoffLimit: 3
ttlSecondsAfterFinished: 600
template:
spec:
restartPolicy: OnFailure
serviceAccountName: operator
nodeSelector:
beta.kubernetes.io/os: linux
node-role.kubernetes.io/master: ""
tolerations:
- effect: NoSchedule
key: node-role.kubernetes.io/master
operator: Exists
- effect: NoExecute
key: node.kubernetes.io/unreachable
operator: Exists
tolerationSeconds: 900
- effect: NoExecute
key: node.kubernetes.io/not-ready
operator: Exists
tolerationSeconds: 900
0sewa0 marked this conversation as resolved.
Show resolved Hide resolved
volumes:
- name: snapshots
emptyDir: {}
- name: service-ca-bundle
configMap:
name: service-ca-bundle
optional: true
initContainers:
- name: insights-operator
image: quay.io/openshift/origin-insights-operator:latest
0sewa0 marked this conversation as resolved.
Show resolved Hide resolved
terminationMessagePolicy: FallbackToLogsOnError
volumeMounts:
- name: snapshots
mountPath: /var/lib/insights-operator
- name: service-ca-bundle
mountPath: /var/run/configmaps/service-ca-bundle
readOnly: true
ports:
- containerPort: 8443
name: https
resources:
requests:
cpu: 10m
memory: 70Mi
args:
- gather
- -v=4
- --config=/etc/insights-operator/server.yaml
containers:
- name: sleepy
image: busybox
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this image available in the cluster registry or does it need to be downloaded? I guess the latter one....and that would be a bit problematic in an air-gapped cluster.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be worth mentioning it in the docs for air-gaped users, but I don't know any simpler way to run anything on openshift :/

Copy link
Contributor

@quarckster quarckster May 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This really could be problematic and cause errors because docker hub has a limitation on anonymous pulls. The safest way would be using the same quay.io/openshift/origin-insights-operator image.

args:
- /bin/sh
- -c
- sleep 10m
volumeMounts: [{name: snapshots, mountPath: /var/lib/insights-operator}]
38 changes: 21 additions & 17 deletions pkg/cmd/start/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,28 +81,32 @@ func runGather(operator controller.GatherJob, cfg *controllercmd.ControllerComma
}
operator.Controller = cont

kubeConfigPath := cmd.Flags().Lookup("kubeconfig").Value.String()
if len(kubeConfigPath) == 0 {
klog.Fatalf("error: --kubeconfig is required")
}
kubeConfigBytes, err := ioutil.ReadFile(kubeConfigPath)
if err != nil {
klog.Fatal(err)
}
kubeConfig, err := clientcmd.NewClientConfigFromBytes(kubeConfigBytes)
if err != nil {
klog.Fatal(err)
}
clientConfig, err := kubeConfig.ClientConfig()
if err != nil {
klog.Fatal(err)
var kubeConfig *rest.Config
if kubeConfigPath := cmd.Flags().Lookup("kubeconfig").Value.String(); len(kubeConfigPath) > 0 {
0sewa0 marked this conversation as resolved.
Show resolved Hide resolved
kubeConfigBytes, err := ioutil.ReadFile(kubeConfigPath)
if err != nil {
klog.Fatal(err)
}
config, err := clientcmd.NewClientConfigFromBytes(kubeConfigBytes)
if err != nil {
klog.Fatal(err)
}
kubeConfig, err = config.ClientConfig()
if err != nil {
klog.Fatal(err)
}
} else {
kubeConfig, err = rest.InClusterConfig()
if err != nil {
klog.Fatal(err)
}
}
protoConfig := rest.CopyConfig(clientConfig)
protoConfig := rest.CopyConfig(kubeConfig)
protoConfig.AcceptContentTypes = "application/vnd.kubernetes.protobuf,application/json"
protoConfig.ContentType = "application/vnd.kubernetes.protobuf"

ctx, cancel := context.WithTimeout(context.Background(), operator.Interval)
err = operator.Gather(ctx, clientConfig, protoConfig)
err = operator.Gather(ctx, kubeConfig, protoConfig)
if err != nil {
klog.Fatal(err)
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package controller

const (
metricCAFile = "/var/run/configmaps/service-ca-bundle/service-ca.crt"
metricHost = "https://prometheus-k8s.openshift-monitoring.svc:9091"
)
21 changes: 10 additions & 11 deletions pkg/controller/gather_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type GatherJob struct {
// 3. Initiates the recorder
// 4. Executes a Gather
// 5. Flushes the results
func (d *GatherJob) Gather(ctx context.Context, kubeConfig *rest.Config, protoKubeConfig *rest.Config) error {
func (d *GatherJob) Gather(ctx context.Context, kubeConfig, protoKubeConfig *rest.Config) error {
klog.Infof("Starting insights-operator %s", version.Get().String())
// these are operator clients
kubeClient, err := kubernetes.NewForConfig(protoKubeConfig)
Expand All @@ -51,15 +51,15 @@ func (d *GatherJob) Gather(ctx context.Context, kubeConfig *rest.Config, protoKu

// the metrics client will connect to prometheus and scrape a small set of metrics
metricsGatherKubeConfig := rest.CopyConfig(kubeConfig)
metricsGatherKubeConfig.CAFile = "/var/run/configmaps/service-ca-bundle/service-ca.crt"
metricsGatherKubeConfig.CAFile = metricCAFile
metricsGatherKubeConfig.NegotiatedSerializer = scheme.Codecs
metricsGatherKubeConfig.GroupVersion = &schema.GroupVersion{}
metricsGatherKubeConfig.APIPath = "/"
metricsGatherKubeConfig.Host = "https://prometheus-k8s.openshift-monitoring.svc:9091"
metricsGatherKubeConfig.Host = metricHost

// ensure the insight snapshot directory exists
if _, err := os.Stat(d.StoragePath); err != nil && os.IsNotExist(err) {
if err := os.MkdirAll(d.StoragePath, 0777); err != nil {
if _, err = os.Stat(d.StoragePath); err != nil && os.IsNotExist(err) {
if err = os.MkdirAll(d.StoragePath, 0777); err != nil {
return fmt.Errorf("can't create --path: %v", err)
}
}
Expand All @@ -69,7 +69,8 @@ func (d *GatherJob) Gather(ctx context.Context, kubeConfig *rest.Config, protoKu

var anonymizer *anonymization.Anonymizer
if anonymization.IsObfuscationEnabled(configObserver) {
configClient, err := configv1client.NewForConfig(kubeConfig)
var configClient *configv1client.ConfigV1Client
configClient, err = configv1client.NewForConfig(kubeConfig)
if err != nil {
return err
}
Expand All @@ -82,17 +83,15 @@ func (d *GatherJob) Gather(ctx context.Context, kubeConfig *rest.Config, protoKu

// the recorder stores the collected data and we flush at the end.
recdriver := diskrecorder.New(d.StoragePath)
recorder := recorder.New(recdriver, d.Interval, anonymizer)
defer recorder.Flush()
rec := recorder.New(recdriver, d.Interval, anonymizer)
defer rec.Flush()

// the gatherers check the state of the cluster and report any
// config to the recorder
clusterConfigGatherer := clusterconfig.New(gatherKubeConfig, gatherProtoKubeConfig, metricsGatherKubeConfig, anonymizer)
err = clusterConfigGatherer.Gather(ctx, configObserver.Config().Gather, recorder)
err = clusterConfigGatherer.Gather(ctx, configObserver.Config().Gather, rec)
if err != nil {
return err
}

klog.Warning("stopped")
0sewa0 marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
24 changes: 12 additions & 12 deletions pkg/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller
// TODO: the oauth-proxy and delegating authorizer do not support Impersonate-User,
// so we do not impersonate gather
metricsGatherKubeConfig := rest.CopyConfig(controller.KubeConfig)
metricsGatherKubeConfig.CAFile = "/var/run/configmaps/service-ca-bundle/service-ca.crt"
metricsGatherKubeConfig.CAFile = metricCAFile
metricsGatherKubeConfig.NegotiatedSerializer = scheme.Codecs
metricsGatherKubeConfig.GroupVersion = &schema.GroupVersion{}
metricsGatherKubeConfig.APIPath = "/"
metricsGatherKubeConfig.Host = "https://prometheus-k8s.openshift-monitoring.svc:9091"
metricsGatherKubeConfig.Host = metricHost

// If we fail, it's likely due to the service CA not existing yet. Warn and continue,
// and when the service-ca is loaded we will be restarted.
Expand All @@ -90,8 +90,8 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller
return err
}
// ensure the insight snapshot directory exists
if _, err := os.Stat(s.StoragePath); err != nil && os.IsNotExist(err) {
if err := os.MkdirAll(s.StoragePath, 0777); err != nil {
if _, err = os.Stat(s.StoragePath); err != nil && os.IsNotExist(err) {
if err = os.MkdirAll(s.StoragePath, 0777); err != nil {
return fmt.Errorf("can't create --path: %v", err)
}
}
Expand All @@ -117,26 +117,26 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller
// the recorder periodically flushes any recorded data to disk as tar.gz files
// in s.StoragePath, and also prunes files above a certain age
recdriver := diskrecorder.New(s.StoragePath)
recorder := recorder.New(recdriver, s.Interval, anonymizer)
go recorder.PeriodicallyPrune(ctx, statusReporter)
rec := recorder.New(recdriver, s.Interval, anonymizer)
go rec.PeriodicallyPrune(ctx, statusReporter)

// the gatherers periodically check the state of the cluster and report any
// config to the recorder
clusterConfigGatherer := clusterconfig.New(
gatherKubeConfig, gatherProtoKubeConfig, metricsGatherKubeConfig, anonymizer,
)
periodic := periodic.New(configObserver, recorder, map[string]gather.Interface{
periodicGather := periodic.New(configObserver, rec, map[string]gather.Interface{
"clusterconfig": clusterConfigGatherer,
})
statusReporter.AddSources(periodic.Sources()...)
statusReporter.AddSources(periodicGather.Sources()...)

// check we can read IO container status and we are not in crash loop
err = wait.PollImmediate(20*time.Second, wait.Jitter(s.Controller.Interval/24, 0.1), isRunning(ctx, gatherKubeConfig))
if err != nil {
initialDelay = wait.Jitter(s.Controller.Interval/12, 0.5)
klog.Infof("Unable to check insights-operator pod status. Setting initial delay to %s", initialDelay)
}
go periodic.Run(ctx.Done(), initialDelay)
go periodicGather.Run(ctx.Done(), initialDelay)

authorizer := clusterauthorizer.New(configObserver)
insightsClient := insightsclient.New(nil, 0, "default", authorizer, gatherKubeConfig)
Expand Down Expand Up @@ -171,13 +171,13 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller
return nil
}

func isRunning(ctx context.Context, config *rest.Config) wait.ConditionFunc {
func isRunning(ctx context.Context, kubeConfig *rest.Config) wait.ConditionFunc {
return func() (bool, error) {
c, err := corev1client.NewForConfig(config)
c, err := corev1client.NewForConfig(kubeConfig)
if err != nil {
return false, err
}
// check if context hasn't been cancelled or done meanwhile
// check if context hasn't been canceled or done meanwhile
err = ctx.Err()
if err != nil {
return false, err
Expand Down