Skip to content

Commit

Permalink
Finish K8s informers when the context is cancelled (grafana#747)
Browse files Browse the repository at this point in the history
  • Loading branch information
mariomac authored Apr 17, 2024
1 parent d925da0 commit df58ce6
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 36 deletions.
6 changes: 3 additions & 3 deletions pkg/components/beyla.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ func setupAppO11y(ctx context.Context, ctxInfo *global.ContextInfo, config *beyl
// 1st process (privileged) - Invoke FindTarget, which also mounts the BPF maps
// 2nd executable (unprivileged) - Invoke ReadAndForward, receiving the BPF map mountpoint as argument

instr := appolly.New(ctxInfo, config)
if err := instr.FindAndInstrument(ctx); err != nil {
instr := appolly.New(ctx, ctxInfo, config)
if err := instr.FindAndInstrument(); err != nil {
slog.Error("Beyla couldn't find target process", "error", err)
os.Exit(-1)
}
if err := instr.ReadAndForward(ctx); err != nil {
if err := instr.ReadAndForward(); err != nil {
slog.Error("Beyla couldn't start read and forwarding", "error", err)
os.Exit(-1)
}
Expand Down
28 changes: 15 additions & 13 deletions pkg/internal/appolly/appolly.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func log() *slog.Logger {
// Instrumenter finds and instrument a service/process, and forwards the traces as
// configured by the user
type Instrumenter struct {
ctx context.Context
config *beyla.Config
ctxInfo *global.ContextInfo

Expand All @@ -37,9 +38,10 @@ type Instrumenter struct {
}

// New Instrumenter, given a Config
func New(ctxInfo *global.ContextInfo, config *beyla.Config) *Instrumenter {
setupFeatureContextInfo(ctxInfo, config)
func New(ctx context.Context, ctxInfo *global.ContextInfo, config *beyla.Config) *Instrumenter {
setupFeatureContextInfo(ctx, ctxInfo, config)
return &Instrumenter{
ctx: ctx,
config: config,
ctxInfo: ctxInfo,
tracesInput: make(chan []request.Span, config.ChannelBufferLen),
Expand All @@ -48,8 +50,8 @@ func New(ctxInfo *global.ContextInfo, config *beyla.Config) *Instrumenter {

// FindAndInstrument searches in background for any new executable matching the
// selection criteria.
func (i *Instrumenter) FindAndInstrument(ctx context.Context) error {
finder := discover.NewProcessFinder(ctx, i.config, i.ctxInfo)
func (i *Instrumenter) FindAndInstrument() error {
finder := discover.NewProcessFinder(i.ctx, i.config, i.ctxInfo)
foundProcesses, deletedProcesses, err := finder.Start()
if err != nil {
return fmt.Errorf("couldn't start Process Finder: %w", err)
Expand All @@ -65,15 +67,15 @@ func (i *Instrumenter) FindAndInstrument(ctx context.Context) error {
contexts := map[uint64]cancelCtx{}
for {
select {
case <-ctx.Done():
case <-i.ctx.Done():
log.Debug("stopped searching for new processes to instrument")
return
case pt := <-foundProcesses:
log.Debug("running tracer for new process",
"inode", pt.ELFInfo.Ino, "pid", pt.ELFInfo.Pid, "exec", pt.ELFInfo.CmdExePath)
cctx, ok := contexts[pt.ELFInfo.Ino]
if !ok {
cctx.ctx, cctx.cancel = context.WithCancel(ctx)
cctx.ctx, cctx.cancel = context.WithCancel(i.ctx)
contexts[pt.ELFInfo.Ino] = cctx
}
go pt.Run(cctx.ctx, i.tracesInput)
Expand All @@ -93,34 +95,34 @@ func (i *Instrumenter) FindAndInstrument(ctx context.Context) error {

// ReadAndForward keeps listening for traces in the BPF map, then reads,
// processes and forwards them
func (i *Instrumenter) ReadAndForward(ctx context.Context) error {
func (i *Instrumenter) ReadAndForward() error {
log := log()
log.Debug("creating instrumentation pipeline")

// TODO: when we split the executable, tracer should be reconstructed somehow
// from this instance
bp, err := pipe.Build(ctx, i.config, i.ctxInfo, i.tracesInput)
bp, err := pipe.Build(i.ctx, i.config, i.ctxInfo, i.tracesInput)
if err != nil {
return fmt.Errorf("can't instantiate instrumentation pipeline: %w", err)
}

log.Info("Starting main node")

bp.Run(ctx)
bp.Run(i.ctx)

log.Info("exiting auto-instrumenter")

return nil
}

func setupFeatureContextInfo(ctxInfo *global.ContextInfo, config *beyla.Config) {
func setupFeatureContextInfo(ctx context.Context, ctxInfo *global.ContextInfo, config *beyla.Config) {
ctxInfo.AppO11y.ReportRoutes = config.Routes != nil
setupKubernetes(ctxInfo, &config.Attributes.Kubernetes)
setupKubernetes(ctx, ctxInfo, &config.Attributes.Kubernetes)
}

// setupKubernetes sets up common Kubernetes database and API clients that need to be accessed
// from different stages in the Beyla pipeline
func setupKubernetes(ctxInfo *global.ContextInfo, k8sCfg *transform.KubernetesDecorator) {
func setupKubernetes(ctx context.Context, ctxInfo *global.ContextInfo, k8sCfg *transform.KubernetesDecorator) {
if !ctxInfo.K8sEnabled {
return
}
Expand All @@ -141,7 +143,7 @@ func setupKubernetes(ctxInfo *global.ContextInfo, k8sCfg *transform.KubernetesDe
}

ctxInfo.AppO11y.K8sInformer = &kube2.Metadata{}
if err := ctxInfo.AppO11y.K8sInformer.InitFromClient(kubeClient, k8sCfg.InformersSyncTimeout); err != nil {
if err := ctxInfo.AppO11y.K8sInformer.InitFromClient(ctx, kubeClient, k8sCfg.InformersSyncTimeout); err != nil {
slog.Error("can't init Kubernetes informer. You can't setup Kubernetes discovery and your"+
" traces won't be decorated with Kubernetes metadata", "error", err)
ctxInfo.AppO11y.K8sInformer = nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/internal/discover/watcher_kube_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestWatcherKubeEnricher(t *testing.T) {
// Setup a fake K8s API connected to the watcherKubeEnricher
k8sClient := fakek8sclientset.NewSimpleClientset()
informer := kube.Metadata{}
require.NoError(t, informer.InitFromClient(k8sClient, 30*time.Minute))
require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient, 30*time.Minute))
wkeNodeFunc, err := WatcherKubeEnricherProvider(true, &informer)()
require.NoError(t, err)
inputCh, outputCh := make(chan []Event[processAttrs], 10), make(chan []Event[processAttrs], 10)
Expand Down Expand Up @@ -118,7 +118,7 @@ func TestWatcherKubeEnricherWithMatcher(t *testing.T) {
// Setup a fake K8s API connected to the watcherKubeEnricher
k8sClient := fakek8sclientset.NewSimpleClientset()
informer := kube.Metadata{}
require.NoError(t, informer.InitFromClient(k8sClient, 30*time.Minute))
require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient, 30*time.Minute))
wkeNodeFunc, err := WatcherKubeEnricherProvider(true, &informer)()
require.NoError(t, err)
pipeConfig := beyla.Config{}
Expand Down
15 changes: 6 additions & 9 deletions pkg/internal/kube/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ type Metadata struct {
pods cache.SharedIndexInformer
replicaSets cache.SharedIndexInformer

stopChan chan struct{}
containerEventHandlers []ContainerEventHandler
}

Expand Down Expand Up @@ -91,7 +90,7 @@ var replicaSetIndexer = cache.Indexers{
},
}

// GetContainerPod fetches metadata from a Pod given the name of one of its containera
// GetContainerPod fetches metadata from a Pod given the name of one of its containers
func (k *Metadata) GetContainerPod(containerID string) (*PodInfo, bool) {
objs, err := k.pods.GetIndexer().ByIndex(IndexPodByContainerIDs, containerID)
if err != nil {
Expand Down Expand Up @@ -254,11 +253,9 @@ func (k *Metadata) initReplicaSetInformer(informerFactory informers.SharedInform
return nil
}

func (k *Metadata) InitFromClient(client kubernetes.Interface, timeout time.Duration) error {
func (k *Metadata) InitFromClient(ctx context.Context, client kubernetes.Interface, timeout time.Duration) error {
// Initialization variables
k.stopChan = make(chan struct{})

return k.initInformers(client, timeout)
return k.initInformers(ctx, client, timeout)
}

func LoadConfig(kubeConfigPath string) (*rest.Config, error) {
Expand Down Expand Up @@ -288,7 +285,7 @@ func LoadConfig(kubeConfigPath string) (*rest.Config, error) {
return config, nil
}

func (k *Metadata) initInformers(client kubernetes.Interface, timeout time.Duration) error {
func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interface, timeout time.Duration) error {
informerFactory := informers.NewSharedInformerFactory(client, syncTime)
err := k.initPodInformer(informerFactory)
if err != nil {
Expand All @@ -301,10 +298,10 @@ func (k *Metadata) initInformers(client kubernetes.Interface, timeout time.Durat

log := klog()
log.Debug("starting kubernetes informers, waiting for syncronization")
informerFactory.Start(k.stopChan)
informerFactory.Start(ctx.Done())
finishedCacheSync := make(chan struct{})
go func() {
informerFactory.WaitForCacheSync(k.stopChan)
informerFactory.WaitForCacheSync(ctx.Done())
close(finishedCacheSync)
}()
select {
Expand Down
14 changes: 6 additions & 8 deletions pkg/internal/netolly/transform/k8s/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package k8s

import (
"context"
"fmt"
"log/slog"
"net"
Expand Down Expand Up @@ -56,7 +57,6 @@ type NetworkInformers struct {
services cache.SharedIndexInformer
// replicaSets caches the ReplicaSets as partially-filled *ObjectMeta pointers
replicaSets cache.SharedIndexInformer
stopChan chan struct{}
}

type Owner struct {
Expand Down Expand Up @@ -316,11 +316,9 @@ func (k *NetworkInformers) initReplicaSetInformer(informerFactory informers.Shar
return nil
}

func (k *NetworkInformers) InitFromConfig(kubeConfigPath string, syncTimeout time.Duration) error {
func (k *NetworkInformers) InitFromConfig(ctx context.Context, kubeConfigPath string, syncTimeout time.Duration) error {
k.log = slog.With("component", "kubernetes.NetworkInformers")
// Initialization variables
k.stopChan = make(chan struct{})

config, err := LoadConfig(kubeConfigPath)
if err != nil {
return err
Expand All @@ -331,7 +329,7 @@ func (k *NetworkInformers) InitFromConfig(kubeConfigPath string, syncTimeout tim
return err
}

err = k.initInformers(kubeClient, syncTimeout)
err = k.initInformers(ctx, kubeClient, syncTimeout)
if err != nil {
return err
}
Expand Down Expand Up @@ -366,7 +364,7 @@ func LoadConfig(kubeConfigPath string) (*rest.Config, error) {
return config, nil
}

func (k *NetworkInformers) initInformers(client kubernetes.Interface, syncTimeout time.Duration) error {
func (k *NetworkInformers) initInformers(ctx context.Context, client kubernetes.Interface, syncTimeout time.Duration) error {
if syncTimeout <= 0 {
syncTimeout = defaultSyncTimeout
}
Expand All @@ -389,8 +387,8 @@ func (k *NetworkInformers) initInformers(client kubernetes.Interface, syncTimeou
}

slog.Debug("starting kubernetes informers, waiting for syncronization")
informerFactory.Start(k.stopChan)
informerFactory.WaitForCacheSync(k.stopChan)
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
slog.Debug("kubernetes informers started")

return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/netolly/transform/k8s/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func newDecorator(ctx context.Context, cfg *transform.KubernetesDecorator) (*dec
}
}

if err := nt.kube.InitFromConfig(cfg.KubeconfigPath, cfg.InformersSyncTimeout); err != nil {
if err := nt.kube.InitFromConfig(ctx, cfg.KubeconfigPath, cfg.InformersSyncTimeout); err != nil {
return nil, err
}
return &nt, nil
Expand Down

0 comments on commit df58ce6

Please sign in to comment.