Skip to content

Commit

Permalink
K8S: Adding ignored pod names as config parameter (#3520)
Browse files Browse the repository at this point in the history
* Add ignore config parameter

* Add ignoredPodNames method

* Fix IgnoredPodNames to contain regexp

* Fix doc and regexp default

* Refactor config entry and WithIgnoredPodNames method

* Refactor ignoredPods to exclude

* Rename Exclude functions and structs

* Refactor Exclude struct
  • Loading branch information
euniceek authored Jun 16, 2021
1 parent dc2f4e8 commit 5ccdbe0
Show file tree
Hide file tree
Showing 12 changed files with 136 additions and 16 deletions.
2 changes: 1 addition & 1 deletion processor/k8sprocessor/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func selectors() (labels.Selector, fields.Selector) {
}

// newFakeClient instantiates a new FakeClient object and satisfies the ClientProvider type
func newFakeClient(_ *zap.Logger, apiCfg k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace) (kube.Client, error) {
func newFakeClient(_ *zap.Logger, apiCfg k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, exclude kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace) (kube.Client, error) {
cs := fake.NewSimpleClientset()

ls, fs := selectors()
Expand Down
14 changes: 14 additions & 0 deletions processor/k8sprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ type Config struct {
// Association section allows to define rules for tagging spans, metrics,
// and logs with Pod metadata.
Association []PodAssociationConfig `mapstructure:"pod_association"`

// Exclude section allows to define names of pod that should be
// ignored while tagging.
Exclude ExcludeConfig `mapstructure:"exclude"`
}

func (cfg *Config) Validate() error {
Expand Down Expand Up @@ -197,3 +201,13 @@ type PodAssociationConfig struct {
// e.g. ip, pod_uid, k8s.pod.ip
Name string `mapstructure:"name"`
}

// ExcludeConfig represent a list of Pods to exclude
type ExcludeConfig struct {
Pods []ExcludePodConfig `mapstructure:"pods"`
}

// ExcludePodConfig represent a Pod name to ignore
type ExcludePodConfig struct {
Name string `mapstructure:"name"`
}
7 changes: 7 additions & 0 deletions processor/k8sprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func TestLoadConfig(t *testing.T) {
&Config{
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),
APIConfig: k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeServiceAccount},
Exclude: ExcludeConfig{Pods: []ExcludePodConfig{{Name: "jaeger-agent"}, {Name: "jaeger-collector"}}},
})

p1 := cfg.Processors[config.NewIDWithName(typeStr, "2")]
Expand Down Expand Up @@ -103,5 +104,11 @@ func TestLoadConfig(t *testing.T) {
Name: "k8s.pod.uid",
},
},
Exclude: ExcludeConfig{
Pods: []ExcludePodConfig{
{Name: "jaeger-agent"},
{Name: "jaeger-collector"},
},
},
})
}
4 changes: 4 additions & 0 deletions processor/k8sprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (

var kubeClientProvider = kube.ClientProvider(nil)
var consumerCapabilities = consumer.Capabilities{MutatesData: true}
var defaultExcludes = ExcludeConfig{Pods: []ExcludePodConfig{{Name: "jaeger-agent"}, {Name: "jaeger-collector"}}}

// NewFactory returns a new factory for the k8s processor.
func NewFactory() component.ProcessorFactory {
Expand All @@ -52,6 +53,7 @@ func createDefaultConfig() config.Processor {
return &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),
APIConfig: k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeServiceAccount},
Exclude: defaultExcludes,
}
}

Expand Down Expand Up @@ -194,6 +196,8 @@ func createProcessorOpts(cfg config.Processor) []Option {

opts = append(opts, WithExtractPodAssociations(oCfg.Association...))

opts = append(opts, WithExcludes(oCfg.Exclude))

return opts
}

Expand Down
10 changes: 6 additions & 4 deletions processor/k8sprocessor/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type WatchClient struct {
Rules ExtractionRules
Filters Filters
Associations []Association
Exclude Excludes

// A map containing Namespace related data, used to associate them with resources.
// Key is namespace name
Expand All @@ -63,12 +64,13 @@ type WatchClient struct {
var dRegex = regexp.MustCompile(`^(.*)-[0-9a-zA-Z]*-[0-9a-zA-Z]*$`)

// New initializes a new k8s Client.
func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules, filters Filters, associations []Association, newClientSet APIClientsetProvider, newInformer InformerProvider, newNamespaceInformer InformerProviderNamespace) (Client, error) {
func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules, filters Filters, associations []Association, exclude Excludes, newClientSet APIClientsetProvider, newInformer InformerProvider, newNamespaceInformer InformerProviderNamespace) (Client, error) {
c := &WatchClient{
logger: logger,
Rules: rules,
Filters: filters,
Associations: associations,
Exclude: exclude,
deploymentRegex: dRegex,
stopCh: make(chan struct{}),
}
Expand Down Expand Up @@ -443,9 +445,9 @@ func (c *WatchClient) shouldIgnorePod(pod *api_v1.Pod) bool {
}
}

// Check well known names that should be ignored
for _, rexp := range podNameIgnorePatterns {
if rexp.MatchString(pod.Name) {
// Check if user requested the pod to be ignored through configuration
for _, excludedPod := range c.Exclude.Pods {
if excludedPod.Name.MatchString(pod.Name) {
return true
}
}
Expand Down
29 changes: 25 additions & 4 deletions processor/k8sprocessor/kube/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ func namespaceAddAndUpdateTest(t *testing.T, c *WatchClient, handler func(obj in
}

func TestDefaultClientset(t *testing.T) {
c, err := New(zap.NewNop(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, nil, nil, nil)
c, err := New(zap.NewNop(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, nil, nil, nil)
assert.Error(t, err)
assert.Equal(t, "invalid authType for kubernetes: ", err.Error())
assert.Nil(t, c)

c, err = New(zap.NewNop(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, newFakeAPIClientset, nil, nil)
c, err = New(zap.NewNop(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, newFakeAPIClientset, nil, nil)
assert.NoError(t, err)
assert.NotNil(t, c)
}
Expand All @@ -129,6 +129,7 @@ func TestBadFilters(t *testing.T) {
ExtractionRules{},
Filters{Fields: []FieldFilter{{Op: selection.Exists}}},
[]Association{},
Excludes{},
newFakeAPIClientset,
NewFakeInformer,
NewFakeNamespaceInformer,
Expand Down Expand Up @@ -168,7 +169,7 @@ func TestConstructorErrors(t *testing.T) {
gotAPIConfig = c
return nil, fmt.Errorf("error creating k8s client")
}
c, err := New(zap.NewNop(), apiCfg, er, ff, []Association{}, clientProvider, NewFakeInformer, NewFakeNamespaceInformer)
c, err := New(zap.NewNop(), apiCfg, er, ff, []Association{}, Excludes{}, clientProvider, NewFakeInformer, NewFakeNamespaceInformer)
assert.Nil(t, c)
assert.Error(t, err)
assert.Equal(t, err.Error(), "error creating k8s client")
Expand Down Expand Up @@ -723,6 +724,20 @@ func TestPodIgnorePatterns(t *testing.T) {
Name: "jaeger-collector",
},
},
}, {
ignore: true,
pod: api_v1.Pod{
ObjectMeta: meta_v1.ObjectMeta{
Name: "jaeger-agent-b2zdv",
},
},
}, {
ignore: false,
pod: api_v1.Pod{
ObjectMeta: meta_v1.ObjectMeta{
Name: "test-pod-name",
},
},
},
}

Expand Down Expand Up @@ -883,7 +898,13 @@ func TestExtractNamespaceLabelsAnnotations(t *testing.T) {
func newTestClientWithRulesAndFilters(t *testing.T, e ExtractionRules, f Filters) (*WatchClient, *observer.ObservedLogs) {
observedLogger, logs := observer.New(zapcore.WarnLevel)
logger := zap.New(observedLogger)
c, err := New(logger, k8sconfig.APIConfig{}, e, f, []Association{}, newFakeAPIClientset, NewFakeInformer, NewFakeNamespaceInformer)
exclude := Excludes{
Pods: []ExcludePods{
{Name: regexp.MustCompile(`jaeger-agent`)},
{Name: regexp.MustCompile(`jaeger-collector`)},
},
}
c, err := New(logger, k8sconfig.APIConfig{}, e, f, []Association{}, exclude, newFakeAPIClientset, NewFakeInformer, NewFakeNamespaceInformer)
require.NoError(t, err)
return c.(*WatchClient), logs
}
Expand Down
16 changes: 11 additions & 5 deletions processor/k8sprocessor/kube/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ type PodIdentifier string

var (
// TODO: move these to config with default values
podNameIgnorePatterns = []*regexp.Regexp{
regexp.MustCompile(`jaeger-agent`),
regexp.MustCompile(`jaeger-collector`),
}
defaultPodDeleteGracePeriod = time.Second * 120
watchSyncPeriod = time.Minute * 5
)
Expand All @@ -59,7 +55,7 @@ type Client interface {
}

// ClientProvider defines a func type that returns a new Client.
type ClientProvider func(*zap.Logger, k8sconfig.APIConfig, ExtractionRules, Filters, []Association, APIClientsetProvider, InformerProvider, InformerProviderNamespace) (Client, error)
type ClientProvider func(*zap.Logger, k8sconfig.APIConfig, ExtractionRules, Filters, []Association, Excludes, APIClientsetProvider, InformerProvider, InformerProviderNamespace) (Client, error)

// APIClientsetProvider defines a func type that initializes and return a new kubernetes
// Clientset object.
Expand Down Expand Up @@ -161,3 +157,13 @@ type Association struct {
From string
Name string
}

// Excludes represent a list of Pods to ignore
type Excludes struct {
Pods []ExcludePods
}

// ExcludePods represent a Pod name to ignore
type ExcludePods struct {
Name *regexp.Regexp
}
17 changes: 17 additions & 0 deletions processor/k8sprocessor/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,20 @@ func WithExtractPodAssociations(podAssociations ...PodAssociationConfig) Option
return nil
}
}

// WithExcludes allows specifying pods to exclude
func WithExcludes(podExclude ExcludeConfig) Option {
return func(p *kubernetesprocessor) error {
ignoredNames := kube.Excludes{}
names := podExclude.Pods

if len(names) == 0 {
names = []ExcludePodConfig{{Name: "jaeger-agent"}, {Name: "jaeger-collector"}}
}
for _, name := range names {
ignoredNames.Pods = append(ignoredNames.Pods, kube.ExcludePods{Name: regexp.MustCompile(name.Name)})
}
p.podIgnore = ignoredNames
return nil
}
}
42 changes: 42 additions & 0 deletions processor/k8sprocessor/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,3 +651,45 @@ func TestWithExtractPodAssociation(t *testing.T) {
})
}
}

func TestWithExcludes(t *testing.T) {
tests := []struct {
name string
args ExcludeConfig
want kube.Excludes
}{
{
"default",
ExcludeConfig{},
kube.Excludes{
Pods: []kube.ExcludePods{
{Name: regexp.MustCompile(`jaeger-agent`)},
{Name: regexp.MustCompile(`jaeger-collector`)},
},
},
},
{
"configured",
ExcludeConfig{
Pods: []ExcludePodConfig{
{Name: "ignore_pod1"},
{Name: "ignore_pod2"},
},
},
kube.Excludes{
Pods: []kube.ExcludePods{
{Name: regexp.MustCompile(`ignore_pod1`)},
{Name: regexp.MustCompile(`ignore_pod2`)},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &kubernetesprocessor{}
option := WithExcludes(tt.args)
option(p)
assert.Equal(t, tt.want, p.podIgnore)
})
}
}
3 changes: 2 additions & 1 deletion processor/k8sprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@ type kubernetesprocessor struct {
rules kube.ExtractionRules
filters kube.Filters
podAssociations []kube.Association
podIgnore kube.Excludes
}

func (kp *kubernetesprocessor) initKubeClient(logger *zap.Logger, kubeClient kube.ClientProvider) error {
if kubeClient == nil {
kubeClient = kube.New
}
if !kp.passthroughMode {
kc, err := kubeClient(logger, kp.apiConfig, kp.rules, kp.filters, kp.podAssociations, nil, nil, nil)
kc, err := kubeClient(logger, kp.apiConfig, kp.rules, kp.filters, kp.podAssociations, kp.podIgnore, nil, nil, nil)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion processor/k8sprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func TestProcessorBadConfig(t *testing.T) {
}

func TestProcessorBadClientProvider(t *testing.T) {
clientProvider := func(_ *zap.Logger, _ k8sconfig.APIConfig, _ kube.ExtractionRules, _ kube.Filters, _ []kube.Association, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace) (kube.Client, error) {
clientProvider := func(_ *zap.Logger, _ k8sconfig.APIConfig, _ kube.ExtractionRules, _ kube.Filters, _ []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace) (kube.Client, error) {
return nil, fmt.Errorf("bad client error")
}

Expand Down
6 changes: 6 additions & 0 deletions processor/k8sprocessor/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ processors:
name: ip
- from: resource_attribute
name: k8s.pod.uid

exclude:
pods:
- name: jaeger-agent
- name: jaeger-collector


exporters:
nop:
Expand Down

0 comments on commit 5ccdbe0

Please sign in to comment.