From df58ce63dac0863f8f354c19c9aacb12b6afe9df Mon Sep 17 00:00:00 2001 From: Mario Macias Date: Wed, 17 Apr 2024 11:54:21 +0200 Subject: [PATCH] Finish K8s informers when the context is cancelled (#747) --- pkg/components/beyla.go | 6 ++-- pkg/internal/appolly/appolly.go | 28 ++++++++++--------- pkg/internal/discover/watcher_kube_test.go | 4 +-- pkg/internal/kube/informer.go | 15 ++++------ .../netolly/transform/k8s/informers.go | 14 ++++------ .../netolly/transform/k8s/kubernetes.go | 2 +- 6 files changed, 33 insertions(+), 36 deletions(-) diff --git a/pkg/components/beyla.go b/pkg/components/beyla.go index 3bb20a6f3..8ba522af4 100644 --- a/pkg/components/beyla.go +++ b/pkg/components/beyla.go @@ -51,12 +51,12 @@ func setupAppO11y(ctx context.Context, ctxInfo *global.ContextInfo, config *beyl // 1st process (privileged) - Invoke FindTarget, which also mounts the BPF maps // 2nd executable (unprivileged) - Invoke ReadAndForward, receiving the BPF map mountpoint as argument - instr := appolly.New(ctxInfo, config) - if err := instr.FindAndInstrument(ctx); err != nil { + instr := appolly.New(ctx, ctxInfo, config) + if err := instr.FindAndInstrument(); err != nil { slog.Error("Beyla couldn't find target process", "error", err) os.Exit(-1) } - if err := instr.ReadAndForward(ctx); err != nil { + if err := instr.ReadAndForward(); err != nil { slog.Error("Beyla couldn't start read and forwarding", "error", err) os.Exit(-1) } diff --git a/pkg/internal/appolly/appolly.go b/pkg/internal/appolly/appolly.go index 320d489c4..e4676509d 100644 --- a/pkg/internal/appolly/appolly.go +++ b/pkg/internal/appolly/appolly.go @@ -26,6 +26,7 @@ func log() *slog.Logger { // Instrumenter finds and instrument a service/process, and forwards the traces as // configured by the user type Instrumenter struct { + ctx context.Context config *beyla.Config ctxInfo *global.ContextInfo @@ -37,9 +38,10 @@ type Instrumenter struct { } // New Instrumenter, given a Config -func New(ctxInfo *global.ContextInfo, config *beyla.Config) *Instrumenter { - setupFeatureContextInfo(ctxInfo, config) +func New(ctx context.Context, ctxInfo *global.ContextInfo, config *beyla.Config) *Instrumenter { + setupFeatureContextInfo(ctx, ctxInfo, config) return &Instrumenter{ + ctx: ctx, config: config, ctxInfo: ctxInfo, tracesInput: make(chan []request.Span, config.ChannelBufferLen), @@ -48,8 +50,8 @@ func New(ctxInfo *global.ContextInfo, config *beyla.Config) *Instrumenter { // FindAndInstrument searches in background for any new executable matching the // selection criteria. -func (i *Instrumenter) FindAndInstrument(ctx context.Context) error { - finder := discover.NewProcessFinder(ctx, i.config, i.ctxInfo) +func (i *Instrumenter) FindAndInstrument() error { + finder := discover.NewProcessFinder(i.ctx, i.config, i.ctxInfo) foundProcesses, deletedProcesses, err := finder.Start() if err != nil { return fmt.Errorf("couldn't start Process Finder: %w", err) @@ -65,7 +67,7 @@ func (i *Instrumenter) FindAndInstrument(ctx context.Context) error { contexts := map[uint64]cancelCtx{} for { select { - case <-ctx.Done(): + case <-i.ctx.Done(): log.Debug("stopped searching for new processes to instrument") return case pt := <-foundProcesses: @@ -73,7 +75,7 @@ func (i *Instrumenter) FindAndInstrument(ctx context.Context) error { "inode", pt.ELFInfo.Ino, "pid", pt.ELFInfo.Pid, "exec", pt.ELFInfo.CmdExePath) cctx, ok := contexts[pt.ELFInfo.Ino] if !ok { - cctx.ctx, cctx.cancel = context.WithCancel(ctx) + cctx.ctx, cctx.cancel = context.WithCancel(i.ctx) contexts[pt.ELFInfo.Ino] = cctx } go pt.Run(cctx.ctx, i.tracesInput) @@ -93,34 +95,34 @@ func (i *Instrumenter) FindAndInstrument(ctx context.Context) error { // ReadAndForward keeps listening for traces in the BPF map, then reads, // processes and forwards them -func (i *Instrumenter) ReadAndForward(ctx context.Context) error { +func (i *Instrumenter) ReadAndForward() error { log := log() log.Debug("creating instrumentation pipeline") // TODO: when we split the executable, tracer should be reconstructed somehow // from this instance - bp, err := pipe.Build(ctx, i.config, i.ctxInfo, i.tracesInput) + bp, err := pipe.Build(i.ctx, i.config, i.ctxInfo, i.tracesInput) if err != nil { return fmt.Errorf("can't instantiate instrumentation pipeline: %w", err) } log.Info("Starting main node") - bp.Run(ctx) + bp.Run(i.ctx) log.Info("exiting auto-instrumenter") return nil } -func setupFeatureContextInfo(ctxInfo *global.ContextInfo, config *beyla.Config) { +func setupFeatureContextInfo(ctx context.Context, ctxInfo *global.ContextInfo, config *beyla.Config) { ctxInfo.AppO11y.ReportRoutes = config.Routes != nil - setupKubernetes(ctxInfo, &config.Attributes.Kubernetes) + setupKubernetes(ctx, ctxInfo, &config.Attributes.Kubernetes) } // setupKubernetes sets up common Kubernetes database and API clients that need to be accessed // from different stages in the Beyla pipeline -func setupKubernetes(ctxInfo *global.ContextInfo, k8sCfg *transform.KubernetesDecorator) { +func setupKubernetes(ctx context.Context, ctxInfo *global.ContextInfo, k8sCfg *transform.KubernetesDecorator) { if !ctxInfo.K8sEnabled { return } @@ -141,7 +143,7 @@ func setupKubernetes(ctxInfo *global.ContextInfo, k8sCfg *transform.KubernetesDe } ctxInfo.AppO11y.K8sInformer = &kube2.Metadata{} - if err := ctxInfo.AppO11y.K8sInformer.InitFromClient(kubeClient, k8sCfg.InformersSyncTimeout); err != nil { + if err := ctxInfo.AppO11y.K8sInformer.InitFromClient(ctx, kubeClient, k8sCfg.InformersSyncTimeout); err != nil { slog.Error("can't init Kubernetes informer. You can't setup Kubernetes discovery and your"+ " traces won't be decorated with Kubernetes metadata", "error", err) ctxInfo.AppO11y.K8sInformer = nil diff --git a/pkg/internal/discover/watcher_kube_test.go b/pkg/internal/discover/watcher_kube_test.go index bce95f9da..b1ed59c1e 100644 --- a/pkg/internal/discover/watcher_kube_test.go +++ b/pkg/internal/discover/watcher_kube_test.go @@ -72,7 +72,7 @@ func TestWatcherKubeEnricher(t *testing.T) { // Setup a fake K8s API connected to the watcherKubeEnricher k8sClient := fakek8sclientset.NewSimpleClientset() informer := kube.Metadata{} - require.NoError(t, informer.InitFromClient(k8sClient, 30*time.Minute)) + require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient, 30*time.Minute)) wkeNodeFunc, err := WatcherKubeEnricherProvider(true, &informer)() require.NoError(t, err) inputCh, outputCh := make(chan []Event[processAttrs], 10), make(chan []Event[processAttrs], 10) @@ -118,7 +118,7 @@ func TestWatcherKubeEnricherWithMatcher(t *testing.T) { // Setup a fake K8s API connected to the watcherKubeEnricher k8sClient := fakek8sclientset.NewSimpleClientset() informer := kube.Metadata{} - require.NoError(t, informer.InitFromClient(k8sClient, 30*time.Minute)) + require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient, 30*time.Minute)) wkeNodeFunc, err := WatcherKubeEnricherProvider(true, &informer)() require.NoError(t, err) pipeConfig := beyla.Config{} diff --git a/pkg/internal/kube/informer.go b/pkg/internal/kube/informer.go index ba460321a..8d05764f7 100644 --- a/pkg/internal/kube/informer.go +++ b/pkg/internal/kube/informer.go @@ -42,7 +42,6 @@ type Metadata struct { pods cache.SharedIndexInformer replicaSets cache.SharedIndexInformer - stopChan chan struct{} containerEventHandlers []ContainerEventHandler } @@ -91,7 +90,7 @@ var replicaSetIndexer = cache.Indexers{ }, } -// GetContainerPod fetches metadata from a Pod given the name of one of its containera +// GetContainerPod fetches metadata from a Pod given the name of one of its containers func (k *Metadata) GetContainerPod(containerID string) (*PodInfo, bool) { objs, err := k.pods.GetIndexer().ByIndex(IndexPodByContainerIDs, containerID) if err != nil { @@ -254,11 +253,9 @@ func (k *Metadata) initReplicaSetInformer(informerFactory informers.SharedInform return nil } -func (k *Metadata) InitFromClient(client kubernetes.Interface, timeout time.Duration) error { +func (k *Metadata) InitFromClient(ctx context.Context, client kubernetes.Interface, timeout time.Duration) error { // Initialization variables - k.stopChan = make(chan struct{}) - - return k.initInformers(client, timeout) + return k.initInformers(ctx, client, timeout) } func LoadConfig(kubeConfigPath string) (*rest.Config, error) { @@ -288,7 +285,7 @@ func LoadConfig(kubeConfigPath string) (*rest.Config, error) { return config, nil } -func (k *Metadata) initInformers(client kubernetes.Interface, timeout time.Duration) error { +func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interface, timeout time.Duration) error { informerFactory := informers.NewSharedInformerFactory(client, syncTime) err := k.initPodInformer(informerFactory) if err != nil { @@ -301,10 +298,10 @@ func (k *Metadata) initInformers(client kubernetes.Interface, timeout time.Durat log := klog() log.Debug("starting kubernetes informers, waiting for syncronization") - informerFactory.Start(k.stopChan) + informerFactory.Start(ctx.Done()) finishedCacheSync := make(chan struct{}) go func() { - informerFactory.WaitForCacheSync(k.stopChan) + informerFactory.WaitForCacheSync(ctx.Done()) close(finishedCacheSync) }() select { diff --git a/pkg/internal/netolly/transform/k8s/informers.go b/pkg/internal/netolly/transform/k8s/informers.go index 4734f428f..40450d106 100644 --- a/pkg/internal/netolly/transform/k8s/informers.go +++ b/pkg/internal/netolly/transform/k8s/informers.go @@ -19,6 +19,7 @@ package k8s import ( + "context" "fmt" "log/slog" "net" @@ -56,7 +57,6 @@ type NetworkInformers struct { services cache.SharedIndexInformer // replicaSets caches the ReplicaSets as partially-filled *ObjectMeta pointers replicaSets cache.SharedIndexInformer - stopChan chan struct{} } type Owner struct { @@ -316,11 +316,9 @@ func (k *NetworkInformers) initReplicaSetInformer(informerFactory informers.Shar return nil } -func (k *NetworkInformers) InitFromConfig(kubeConfigPath string, syncTimeout time.Duration) error { +func (k *NetworkInformers) InitFromConfig(ctx context.Context, kubeConfigPath string, syncTimeout time.Duration) error { k.log = slog.With("component", "kubernetes.NetworkInformers") // Initialization variables - k.stopChan = make(chan struct{}) - config, err := LoadConfig(kubeConfigPath) if err != nil { return err @@ -331,7 +329,7 @@ func (k *NetworkInformers) InitFromConfig(kubeConfigPath string, syncTimeout tim return err } - err = k.initInformers(kubeClient, syncTimeout) + err = k.initInformers(ctx, kubeClient, syncTimeout) if err != nil { return err } @@ -366,7 +364,7 @@ func LoadConfig(kubeConfigPath string) (*rest.Config, error) { return config, nil } -func (k *NetworkInformers) initInformers(client kubernetes.Interface, syncTimeout time.Duration) error { +func (k *NetworkInformers) initInformers(ctx context.Context, client kubernetes.Interface, syncTimeout time.Duration) error { if syncTimeout <= 0 { syncTimeout = defaultSyncTimeout } @@ -389,8 +387,8 @@ func (k *NetworkInformers) initInformers(client kubernetes.Interface, syncTimeou } slog.Debug("starting kubernetes informers, waiting for syncronization") - informerFactory.Start(k.stopChan) - informerFactory.WaitForCacheSync(k.stopChan) + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) slog.Debug("kubernetes informers started") return nil diff --git a/pkg/internal/netolly/transform/k8s/kubernetes.go b/pkg/internal/netolly/transform/k8s/kubernetes.go index 183888b6d..ba4b8b4f2 100644 --- a/pkg/internal/netolly/transform/k8s/kubernetes.go +++ b/pkg/internal/netolly/transform/k8s/kubernetes.go @@ -181,7 +181,7 @@ func newDecorator(ctx context.Context, cfg *transform.KubernetesDecorator) (*dec } } - if err := nt.kube.InitFromConfig(cfg.KubeconfigPath, cfg.InformersSyncTimeout); err != nil { + if err := nt.kube.InitFromConfig(ctx, cfg.KubeconfigPath, cfg.InformersSyncTimeout); err != nil { return nil, err } return &nt, nil