diff --git a/docs/sources/configure/options.md b/docs/sources/configure/options.md index 209f77f598..5bcb35b47b 100644 --- a/docs/sources/configure/options.md +++ b/docs/sources/configure/options.md @@ -396,6 +396,16 @@ expression. If other selectors are specified in the same `services` entry, the processes to be selected need to match all the selector properties. +| YAML | Env var | Type | Default | +| ---------------- | ------- | --------------------------- | ------- | +| `k8s_pod_labels` | -- | map\[string\]string (regular expression) | (unset) | + +This selector property will limit the instrumentation to the applications +running in the Pods having labels with keys matching the provided value as regular expression. + +If other selectors are specified in the same `services` entry, the processes to be +selected need to match all the selector properties. + ## EBPF tracer YAML section `ebpf`. diff --git a/pkg/internal/discover/matcher.go b/pkg/internal/discover/matcher.go index 5e20e7a170..62be627fdc 100644 --- a/pkg/internal/discover/matcher.go +++ b/pkg/internal/discover/matcher.go @@ -84,7 +84,7 @@ func (m *matcher) filterCreated(obj processAttrs) (Event[ProcessMatch], bool) { } for i := range m.criteria { if m.matchProcess(&obj, proc, &m.criteria[i]) { - m.log.Debug("found process", "pid", proc.Pid, "comm", proc.ExePath, "metadata", obj.metadata) + m.log.Debug("found process", "pid", proc.Pid, "comm", proc.ExePath, "metadata", obj.metadata, "podLabels", obj.podLabels) m.processHistory[obj.pid] = proc return Event[ProcessMatch]{ Type: EventCreated, @@ -133,7 +133,7 @@ func (m *matcher) matchProcess(obj *processAttrs, p *services.ProcessInfo, a *se // after matching by process basic information, we check if it matches // by metadata. // If there is no metadata, this will return true. - return m.matchByAttributes(obj.metadata, a.Metadata) + return m.matchByAttributes(obj, a) } func (m *matcher) matchByPort(p *services.ProcessInfo, a *services.Attributes) bool { @@ -152,9 +152,24 @@ func (m *matcher) matchByExecutable(p *services.ProcessInfo, a *services.Attribu return a.PathRegexp.MatchString(p.ExePath) } -func (m *matcher) matchByAttributes(actual map[string]string, required map[string]*services.RegexpAttr) bool { - for attrName, criteriaRegexp := range required { - if attrValue, ok := actual[attrName]; !ok || !criteriaRegexp.MatchString(attrValue) { +func (m *matcher) matchByAttributes(actual *processAttrs, required *services.Attributes) bool { + if required == nil { + return true + } + if actual == nil { + return false + } + + // match metadata + for attrName, criteriaRegexp := range required.Metadata { + if attrValue, ok := actual.metadata[attrName]; !ok || !criteriaRegexp.MatchString(attrValue) { + return false + } + } + + // match pod labels + for labelName, criteriaRegexp := range required.PodLabels { + if actualPodLabelValue, ok := actual.podLabels[labelName]; !ok || !criteriaRegexp.MatchString(actualPodLabelValue) { return false } } @@ -188,7 +203,7 @@ func FindingCriteria(cfg *beyla.Config) services.DefinitionCriteria { // any executable in the matched k8s entities for i := range finderCriteria { fc := &finderCriteria[i] - if !fc.Path.IsSet() && fc.OpenPorts.Len() == 0 && len(fc.Metadata) > 0 { + if !fc.Path.IsSet() && fc.OpenPorts.Len() == 0 && (len(fc.Metadata) > 0 || len(fc.PodLabels) > 0) { // match any executable path if err := fc.Path.UnmarshalText([]byte(".")); err != nil { panic("bug! " + err.Error()) diff --git a/pkg/internal/discover/watcher_kube.go b/pkg/internal/discover/watcher_kube.go index 7d97adbb76..42511b6bb4 100644 --- a/pkg/internal/discover/watcher_kube.go +++ b/pkg/internal/discover/watcher_kube.go @@ -300,6 +300,7 @@ func withMetadata(pp processAttrs, info *kube.PodInfo) processAttrs { services.AttrNamespace: info.Namespace, services.AttrPodName: info.Name, } + ret.podLabels = info.Labels owner := info.Owner for owner != nil { ret.metadata[services.AttrOwnerName] = owner.Name diff --git a/pkg/internal/discover/watcher_kube_test.go b/pkg/internal/discover/watcher_kube_test.go index 11140079a2..77ba59db07 100644 --- a/pkg/internal/discover/watcher_kube_test.go +++ b/pkg/internal/discover/watcher_kube_test.go @@ -45,7 +45,7 @@ func TestWatcherKubeEnricher(t *testing.T) { newProcess(inputCh, containerPID, []uint32{containerPort}) } var pod = func(t *testing.T, _ chan []Event[processAttrs], k8sClient *fakek8sclientset.Clientset) { - deployPod(t, k8sClient, namespace, podName, containerID) + deployPod(t, k8sClient, namespace, podName, containerID, nil) } var ownedPod = func(t *testing.T, _ chan []Event[processAttrs], k8sClient *fakek8sclientset.Clientset) { deployOwnedPod(t, k8sClient, namespace, podName, replicaSetName, containerID) @@ -136,6 +136,13 @@ func TestWatcherKubeEnricherWithMatcher(t *testing.T) { - name: both open_ports: 443 k8s_deployment_name: chacha + - name: pod-label-only + k8s_pod_labels: + instrument: "beyla" + - name: pod-multi-label-only + k8s_pod_labels: + instrument: "ebpf" + lang: "go.*" `), &pipeConfig)) mtchNodeFunc, err := CriteriaMatcherProvider(CriteriaMatcher{Cfg: &pipeConfig}) require.NoError(t, err) @@ -168,7 +175,7 @@ func TestWatcherKubeEnricherWithMatcher(t *testing.T) { t.Run("metadata-only match", func(t *testing.T) { newProcess(inputCh, 34, []uint32{8080}) - deployPod(t, k8sClient, namespace, "chichi", "container-34") + deployPod(t, k8sClient, namespace, "chichi", "container-34", nil) matches := testutil.ReadChannel(t, outputCh, timeout) require.Len(t, matches, 1) m := matches[0] @@ -177,6 +184,28 @@ func TestWatcherKubeEnricherWithMatcher(t *testing.T) { assert.EqualValues(t, 34, m.Obj.Process.Pid) }) + t.Run("pod-label-only match", func(t *testing.T) { + newProcess(inputCh, 42, []uint32{8080}) + deployPod(t, k8sClient, namespace, "labeltest", "container-42", map[string]string{"instrument": "beyla"}) + matches := testutil.ReadChannel(t, outputCh, timeout) + require.Len(t, matches, 1) + m := matches[0] + assert.Equal(t, EventCreated, m.Type) + assert.Equal(t, "pod-label-only", m.Obj.Criteria.Name) + assert.EqualValues(t, 42, m.Obj.Process.Pid) + }) + + t.Run("pod-multi-label-only match", func(t *testing.T) { + newProcess(inputCh, 43, []uint32{8080}) + deployPod(t, k8sClient, namespace, "multi-labeltest", "container-43", map[string]string{"instrument": "ebpf", "lang": "golang"}) + matches := testutil.ReadChannel(t, outputCh, timeout) + require.Len(t, matches, 1) + m := matches[0] + assert.Equal(t, EventCreated, m.Type) + assert.Equal(t, "pod-multi-label-only", m.Obj.Criteria.Name) + assert.EqualValues(t, 43, m.Obj.Process.Pid) + }) + t.Run("both process and metadata match", func(t *testing.T) { newProcess(inputCh, 56, []uint32{443}) deployOwnedPod(t, k8sClient, namespace, "pod-56", "rs-56", "container-56") @@ -218,12 +247,13 @@ func newProcess(inputCh chan []Event[processAttrs], pid PID, ports []uint32) { }} } -func deployPod(t *testing.T, k8sClient *fakek8sclientset.Clientset, ns, name, containerID string) { +func deployPod(t *testing.T, k8sClient *fakek8sclientset.Clientset, ns, name, containerID string, labels map[string]string) { t.Helper() _, err := k8sClient.CoreV1().Pods(ns).Create( context.Background(), &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: ns, + Labels: labels, }, Status: corev1.PodStatus{ ContainerStatuses: []corev1.ContainerStatus{{ ContainerID: containerID, diff --git a/pkg/internal/discover/watcher_proc.go b/pkg/internal/discover/watcher_proc.go index fef786c846..45020338b1 100644 --- a/pkg/internal/discover/watcher_proc.go +++ b/pkg/internal/discover/watcher_proc.go @@ -49,6 +49,7 @@ type processAttrs struct { pid PID openPorts []uint32 metadata map[string]string + podLabels map[string]string } func wplog() *slog.Logger { diff --git a/pkg/internal/kube/informer.go b/pkg/internal/kube/informer.go index 617415fffb..ffec8caac0 100644 --- a/pkg/internal/kube/informer.go +++ b/pkg/internal/kube/informer.go @@ -147,6 +147,7 @@ func (k *Metadata) initPodInformer(informerFactory informers.SharedInformerFacto Name: pod.Name, Namespace: pod.Namespace, UID: pod.UID, + Labels: pod.Labels, }, Owner: owner, NodeName: pod.Spec.NodeName, diff --git a/pkg/services/criteria.go b/pkg/services/criteria.go index c4f150ad33..c5eb1113bb 100644 --- a/pkg/services/criteria.go +++ b/pkg/services/criteria.go @@ -74,7 +74,8 @@ func (dc DefinitionCriteria) Validate() error { if dc[i].OpenPorts.Len() == 0 && !dc[i].Path.IsSet() && !dc[i].PathRegexp.IsSet() && - len(dc[i].Metadata) == 0 { + len(dc[i].Metadata) == 0 && + len(dc[i].PodLabels) == 0 { return fmt.Errorf("discovery.services[%d] should define at least one selection criteria", i) } for k := range dc[i].Metadata { @@ -115,6 +116,9 @@ type Attributes struct { // Metadata stores other attributes, such as Kubernetes object metadata Metadata map[string]*RegexpAttr `yaml:",inline"` + + // PodLabels allows matching against the labels of a pod + PodLabels map[string]*RegexpAttr `yaml:"k8s_pod_labels"` } // PortEnum defines an enumeration of ports. It allows defining a set of single ports as well a set of