Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finish K8s informers when the context is cancelled #747

Merged
merged 1 commit into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/components/beyla.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ func setupAppO11y(ctx context.Context, ctxInfo *global.ContextInfo, config *beyl
// 1st process (privileged) - Invoke FindTarget, which also mounts the BPF maps
// 2nd executable (unprivileged) - Invoke ReadAndForward, receiving the BPF map mountpoint as argument

instr := appolly.New(ctxInfo, config)
if err := instr.FindAndInstrument(ctx); err != nil {
instr := appolly.New(ctx, ctxInfo, config)
if err := instr.FindAndInstrument(); err != nil {
slog.Error("Beyla couldn't find target process", "error", err)
os.Exit(-1)
}
if err := instr.ReadAndForward(ctx); err != nil {
if err := instr.ReadAndForward(); err != nil {
slog.Error("Beyla couldn't start read and forwarding", "error", err)
os.Exit(-1)
}
Expand Down
28 changes: 15 additions & 13 deletions pkg/internal/appolly/appolly.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func log() *slog.Logger {
// Instrumenter finds and instrument a service/process, and forwards the traces as
// configured by the user
type Instrumenter struct {
ctx context.Context
config *beyla.Config
ctxInfo *global.ContextInfo

Expand All @@ -37,9 +38,10 @@ type Instrumenter struct {
}

// New Instrumenter, given a Config
func New(ctxInfo *global.ContextInfo, config *beyla.Config) *Instrumenter {
setupFeatureContextInfo(ctxInfo, config)
func New(ctx context.Context, ctxInfo *global.ContextInfo, config *beyla.Config) *Instrumenter {
setupFeatureContextInfo(ctx, ctxInfo, config)
return &Instrumenter{
ctx: ctx,
config: config,
ctxInfo: ctxInfo,
tracesInput: make(chan []request.Span, config.ChannelBufferLen),
Expand All @@ -48,8 +50,8 @@ func New(ctxInfo *global.ContextInfo, config *beyla.Config) *Instrumenter {

// FindAndInstrument searches in background for any new executable matching the
// selection criteria.
func (i *Instrumenter) FindAndInstrument(ctx context.Context) error {
finder := discover.NewProcessFinder(ctx, i.config, i.ctxInfo)
func (i *Instrumenter) FindAndInstrument() error {
finder := discover.NewProcessFinder(i.ctx, i.config, i.ctxInfo)
foundProcesses, deletedProcesses, err := finder.Start()
if err != nil {
return fmt.Errorf("couldn't start Process Finder: %w", err)
Expand All @@ -65,15 +67,15 @@ func (i *Instrumenter) FindAndInstrument(ctx context.Context) error {
contexts := map[uint64]cancelCtx{}
for {
select {
case <-ctx.Done():
case <-i.ctx.Done():
log.Debug("stopped searching for new processes to instrument")
return
case pt := <-foundProcesses:
log.Debug("running tracer for new process",
"inode", pt.ELFInfo.Ino, "pid", pt.ELFInfo.Pid, "exec", pt.ELFInfo.CmdExePath)
cctx, ok := contexts[pt.ELFInfo.Ino]
if !ok {
cctx.ctx, cctx.cancel = context.WithCancel(ctx)
cctx.ctx, cctx.cancel = context.WithCancel(i.ctx)
contexts[pt.ELFInfo.Ino] = cctx
}
go pt.Run(cctx.ctx, i.tracesInput)
Expand All @@ -93,34 +95,34 @@ func (i *Instrumenter) FindAndInstrument(ctx context.Context) error {

// ReadAndForward keeps listening for traces in the BPF map, then reads,
// processes and forwards them
func (i *Instrumenter) ReadAndForward(ctx context.Context) error {
func (i *Instrumenter) ReadAndForward() error {
log := log()
log.Debug("creating instrumentation pipeline")

// TODO: when we split the executable, tracer should be reconstructed somehow
// from this instance
bp, err := pipe.Build(ctx, i.config, i.ctxInfo, i.tracesInput)
bp, err := pipe.Build(i.ctx, i.config, i.ctxInfo, i.tracesInput)
if err != nil {
return fmt.Errorf("can't instantiate instrumentation pipeline: %w", err)
}

log.Info("Starting main node")

bp.Run(ctx)
bp.Run(i.ctx)

log.Info("exiting auto-instrumenter")

return nil
}

func setupFeatureContextInfo(ctxInfo *global.ContextInfo, config *beyla.Config) {
func setupFeatureContextInfo(ctx context.Context, ctxInfo *global.ContextInfo, config *beyla.Config) {
ctxInfo.AppO11y.ReportRoutes = config.Routes != nil
setupKubernetes(ctxInfo, &config.Attributes.Kubernetes)
setupKubernetes(ctx, ctxInfo, &config.Attributes.Kubernetes)
}

// setupKubernetes sets up common Kubernetes database and API clients that need to be accessed
// from different stages in the Beyla pipeline
func setupKubernetes(ctxInfo *global.ContextInfo, k8sCfg *transform.KubernetesDecorator) {
func setupKubernetes(ctx context.Context, ctxInfo *global.ContextInfo, k8sCfg *transform.KubernetesDecorator) {
if !ctxInfo.K8sEnabled {
return
}
Expand All @@ -141,7 +143,7 @@ func setupKubernetes(ctxInfo *global.ContextInfo, k8sCfg *transform.KubernetesDe
}

ctxInfo.AppO11y.K8sInformer = &kube2.Metadata{}
if err := ctxInfo.AppO11y.K8sInformer.InitFromClient(kubeClient, k8sCfg.InformersSyncTimeout); err != nil {
if err := ctxInfo.AppO11y.K8sInformer.InitFromClient(ctx, kubeClient, k8sCfg.InformersSyncTimeout); err != nil {
slog.Error("can't init Kubernetes informer. You can't setup Kubernetes discovery and your"+
" traces won't be decorated with Kubernetes metadata", "error", err)
ctxInfo.AppO11y.K8sInformer = nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/internal/discover/watcher_kube_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestWatcherKubeEnricher(t *testing.T) {
// Setup a fake K8s API connected to the watcherKubeEnricher
k8sClient := fakek8sclientset.NewSimpleClientset()
informer := kube.Metadata{}
require.NoError(t, informer.InitFromClient(k8sClient, 30*time.Minute))
require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient, 30*time.Minute))
wkeNodeFunc, err := WatcherKubeEnricherProvider(true, &informer)()
require.NoError(t, err)
inputCh, outputCh := make(chan []Event[processAttrs], 10), make(chan []Event[processAttrs], 10)
Expand Down Expand Up @@ -118,7 +118,7 @@ func TestWatcherKubeEnricherWithMatcher(t *testing.T) {
// Setup a fake K8s API connected to the watcherKubeEnricher
k8sClient := fakek8sclientset.NewSimpleClientset()
informer := kube.Metadata{}
require.NoError(t, informer.InitFromClient(k8sClient, 30*time.Minute))
require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient, 30*time.Minute))
wkeNodeFunc, err := WatcherKubeEnricherProvider(true, &informer)()
require.NoError(t, err)
pipeConfig := beyla.Config{}
Expand Down
15 changes: 6 additions & 9 deletions pkg/internal/kube/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ type Metadata struct {
pods cache.SharedIndexInformer
replicaSets cache.SharedIndexInformer

stopChan chan struct{}
containerEventHandlers []ContainerEventHandler
}

Expand Down Expand Up @@ -91,7 +90,7 @@ var replicaSetIndexer = cache.Indexers{
},
}

// GetContainerPod fetches metadata from a Pod given the name of one of its containera
// GetContainerPod fetches metadata from a Pod given the name of one of its containers
func (k *Metadata) GetContainerPod(containerID string) (*PodInfo, bool) {
objs, err := k.pods.GetIndexer().ByIndex(IndexPodByContainerIDs, containerID)
if err != nil {
Expand Down Expand Up @@ -254,11 +253,9 @@ func (k *Metadata) initReplicaSetInformer(informerFactory informers.SharedInform
return nil
}

func (k *Metadata) InitFromClient(client kubernetes.Interface, timeout time.Duration) error {
func (k *Metadata) InitFromClient(ctx context.Context, client kubernetes.Interface, timeout time.Duration) error {
// Initialization variables
k.stopChan = make(chan struct{})

return k.initInformers(client, timeout)
return k.initInformers(ctx, client, timeout)
}

func LoadConfig(kubeConfigPath string) (*rest.Config, error) {
Expand Down Expand Up @@ -288,7 +285,7 @@ func LoadConfig(kubeConfigPath string) (*rest.Config, error) {
return config, nil
}

func (k *Metadata) initInformers(client kubernetes.Interface, timeout time.Duration) error {
func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interface, timeout time.Duration) error {
informerFactory := informers.NewSharedInformerFactory(client, syncTime)
err := k.initPodInformer(informerFactory)
if err != nil {
Expand All @@ -301,10 +298,10 @@ func (k *Metadata) initInformers(client kubernetes.Interface, timeout time.Durat

log := klog()
log.Debug("starting kubernetes informers, waiting for syncronization")
informerFactory.Start(k.stopChan)
informerFactory.Start(ctx.Done())
finishedCacheSync := make(chan struct{})
go func() {
informerFactory.WaitForCacheSync(k.stopChan)
informerFactory.WaitForCacheSync(ctx.Done())
close(finishedCacheSync)
}()
select {
Expand Down
14 changes: 6 additions & 8 deletions pkg/internal/netolly/transform/k8s/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package k8s

import (
"context"
"fmt"
"log/slog"
"net"
Expand Down Expand Up @@ -56,7 +57,6 @@ type NetworkInformers struct {
services cache.SharedIndexInformer
// replicaSets caches the ReplicaSets as partially-filled *ObjectMeta pointers
replicaSets cache.SharedIndexInformer
stopChan chan struct{}
}

type Owner struct {
Expand Down Expand Up @@ -316,11 +316,9 @@ func (k *NetworkInformers) initReplicaSetInformer(informerFactory informers.Shar
return nil
}

func (k *NetworkInformers) InitFromConfig(kubeConfigPath string, syncTimeout time.Duration) error {
func (k *NetworkInformers) InitFromConfig(ctx context.Context, kubeConfigPath string, syncTimeout time.Duration) error {
k.log = slog.With("component", "kubernetes.NetworkInformers")
// Initialization variables
k.stopChan = make(chan struct{})

config, err := LoadConfig(kubeConfigPath)
if err != nil {
return err
Expand All @@ -331,7 +329,7 @@ func (k *NetworkInformers) InitFromConfig(kubeConfigPath string, syncTimeout tim
return err
}

err = k.initInformers(kubeClient, syncTimeout)
err = k.initInformers(ctx, kubeClient, syncTimeout)
if err != nil {
return err
}
Expand Down Expand Up @@ -366,7 +364,7 @@ func LoadConfig(kubeConfigPath string) (*rest.Config, error) {
return config, nil
}

func (k *NetworkInformers) initInformers(client kubernetes.Interface, syncTimeout time.Duration) error {
func (k *NetworkInformers) initInformers(ctx context.Context, client kubernetes.Interface, syncTimeout time.Duration) error {
if syncTimeout <= 0 {
syncTimeout = defaultSyncTimeout
}
Expand All @@ -389,8 +387,8 @@ func (k *NetworkInformers) initInformers(client kubernetes.Interface, syncTimeou
}

slog.Debug("starting kubernetes informers, waiting for syncronization")
informerFactory.Start(k.stopChan)
informerFactory.WaitForCacheSync(k.stopChan)
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
slog.Debug("kubernetes informers started")

return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/netolly/transform/k8s/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func newDecorator(ctx context.Context, cfg *transform.KubernetesDecorator) (*dec
}
}

if err := nt.kube.InitFromConfig(cfg.KubeconfigPath, cfg.InformersSyncTimeout); err != nil {
if err := nt.kube.InitFromConfig(ctx, cfg.KubeconfigPath, cfg.InformersSyncTimeout); err != nil {
return nil, err
}
return &nt, nil
Expand Down
Loading