Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/k8sattributesprocessor] Fix memory leak #30842

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/goleak_k8sattribute.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sattributesprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix goroutine leak caused by error on startup

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30438, 30841]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
4 changes: 3 additions & 1 deletion processor/k8sattributesprocessor/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type fakeClient struct {
Namespaces map[string]*kube.Namespace
Nodes map[string]*kube.Node
StopCh chan struct{}
passthroughMode bool
}

func selectors() (labels.Selector, fields.Selector) {
Expand All @@ -35,7 +36,7 @@ func selectors() (labels.Selector, fields.Selector) {
}

// newFakeClient instantiates a new FakeClient object and satisfies the ClientProvider type
func newFakeClient(_ *zap.Logger, _ k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet) (kube.Client, error) {
func newFakeClient(_ *zap.Logger, _ k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet, passthroughMode bool) (kube.Client, error) {
cs := fake.NewSimpleClientset()

ls, fs := selectors()
Expand All @@ -49,6 +50,7 @@ func newFakeClient(_ *zap.Logger, _ k8sconfig.APIConfig, rules kube.ExtractionRu
NodeInformer: kube.NewFakeInformer(cs, "", ls, fs),
ReplicaSetInformer: kube.NewFakeInformer(cs, "", ls, fs),
StopCh: make(chan struct{}),
passthroughMode: passthroughMode,
}, nil
}

Expand Down
12 changes: 10 additions & 2 deletions processor/k8sattributesprocessor/internal/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ type WatchClient struct {
// A map containing ReplicaSets related data, used to associate them with resources.
// Key is replicaset uid
ReplicaSets map[string]*ReplicaSet

passthroughMode bool
}

// Extract replicaset name from the pod name. Pod name is created using
Expand All @@ -79,7 +81,7 @@ var rRegex = regexp.MustCompile(`^(.*)-[0-9a-zA-Z]+$`)
var cronJobRegex = regexp.MustCompile(`^(.*)-[0-9]+$`)

// New initializes a new k8s Client.
func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules, filters Filters, associations []Association, exclude Excludes, newClientSet APIClientsetProvider, newInformer InformerProvider, newNamespaceInformer InformerProviderNamespace, newReplicaSetInformer InformerProviderReplicaSet) (Client, error) {
func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules, filters Filters, associations []Association, exclude Excludes, newClientSet APIClientsetProvider, newInformer InformerProvider, newNamespaceInformer InformerProviderNamespace, newReplicaSetInformer InformerProviderReplicaSet, passthroughMode bool) (Client, error) {
c := &WatchClient{
logger: logger,
Rules: rules,
Expand All @@ -89,8 +91,8 @@ func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules,
replicasetRegex: rRegex,
cronJobRegex: cronJobRegex,
stopCh: make(chan struct{}),
passthroughMode: passthroughMode,
}
go c.deleteLoop(time.Second*30, defaultPodDeleteGracePeriod)

c.Pods = map[PodIdentifier]*Pod{}
c.Namespaces = map[string]*Namespace{}
Expand Down Expand Up @@ -177,6 +179,12 @@ func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules,

// Start registers pod event handlers and starts watching the kubernetes cluster for pod changes.
func (c *WatchClient) Start() {
go c.deleteLoop(time.Second*30, defaultPodDeleteGracePeriod)

if c.passthroughMode {
return
}

_, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handlePodAdd,
UpdateFunc: c.handlePodUpdate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,12 @@ func nodeAddAndUpdateTest(t *testing.T, c *WatchClient, handler func(obj any)) {
}

func TestDefaultClientset(t *testing.T) {
c, err := New(zap.NewNop(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, nil, nil, nil, nil)
c, err := New(zap.NewNop(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, nil, nil, nil, nil, false)
assert.Error(t, err)
assert.Equal(t, "invalid authType for kubernetes: ", err.Error())
assert.Nil(t, c)

c, err = New(zap.NewNop(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, newFakeAPIClientset, nil, nil, nil)
c, err = New(zap.NewNop(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, newFakeAPIClientset, nil, nil, nil, false)
assert.NoError(t, err)
assert.NotNil(t, c)
}
Expand All @@ -166,6 +166,7 @@ func TestBadFilters(t *testing.T) {
NewFakeInformer,
NewFakeNamespaceInformer,
NewFakeReplicaSetInformer,
false,
)
assert.Error(t, err)
assert.Nil(t, c)
Expand Down Expand Up @@ -202,7 +203,7 @@ func TestConstructorErrors(t *testing.T) {
gotAPIConfig = c
return nil, fmt.Errorf("error creating k8s client")
}
c, err := New(zap.NewNop(), apiCfg, er, ff, []Association{}, Excludes{}, clientProvider, NewFakeInformer, NewFakeNamespaceInformer, nil)
c, err := New(zap.NewNop(), apiCfg, er, ff, []Association{}, Excludes{}, clientProvider, NewFakeInformer, NewFakeNamespaceInformer, nil, false)
assert.Nil(t, c)
assert.Error(t, err)
assert.Equal(t, "error creating k8s client", err.Error())
Expand Down Expand Up @@ -1959,7 +1960,7 @@ func newTestClientWithRulesAndFilters(t *testing.T, f Filters) (*WatchClient, *o
},
},
}
c, err := New(logger, k8sconfig.APIConfig{}, ExtractionRules{}, f, associations, exclude, newFakeAPIClientset, NewFakeInformer, NewFakeNamespaceInformer, NewFakeReplicaSetInformer)
c, err := New(logger, k8sconfig.APIConfig{}, ExtractionRules{}, f, associations, exclude, newFakeAPIClientset, NewFakeInformer, NewFakeNamespaceInformer, NewFakeReplicaSetInformer, false)
require.NoError(t, err)
return c.(*WatchClient), logs
}
Expand Down
2 changes: 1 addition & 1 deletion processor/k8sattributesprocessor/internal/kube/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type Client interface {
}

// ClientProvider defines a func type that returns a new Client.
type ClientProvider func(*zap.Logger, k8sconfig.APIConfig, ExtractionRules, Filters, []Association, Excludes, APIClientsetProvider, InformerProvider, InformerProviderNamespace, InformerProviderReplicaSet) (Client, error)
type ClientProvider func(*zap.Logger, k8sconfig.APIConfig, ExtractionRules, Filters, []Association, Excludes, APIClientsetProvider, InformerProvider, InformerProviderNamespace, InformerProviderReplicaSet, bool) (Client, error)

// APIClientsetProvider defines a func type that initializes and return a new kubernetes
// Clientset object.
Expand Down
17 changes: 17 additions & 0 deletions processor/k8sattributesprocessor/internal/kube/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package kube

import (
"testing"

"go.uber.org/goleak"
)

// The IgnoreTopFunction call prevents catching the leak generated by opencensus
// defaultWorker.Start which at this time is part of the package's init call.
// See https://github.com/census-instrumentation/opencensus-go/issues/1191 for more information.
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package observability

import (
"testing"

"go.uber.org/goleak"
)

// The IgnoreTopFunction call prevents catching the leak generated by opencensus
// defaultWorker.Start which at this time is part of the package's init call.
// See https://github.com/census-instrumentation/opencensus-go/issues/1191 for more information.
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))
}
17 changes: 17 additions & 0 deletions processor/k8sattributesprocessor/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package k8sattributesprocessor

import (
"testing"

"go.uber.org/goleak"
)

// The IgnoreTopFunction call prevents catching the leak generated by opencensus
// defaultWorker.Start which at this time is part of the package's init call.
// See https://github.com/census-instrumentation/opencensus-go/issues/1191 for more information.
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))
}
12 changes: 7 additions & 5 deletions processor/k8sattributesprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (kp *kubernetesprocessor) initKubeClient(logger *zap.Logger, kubeClient kub
kubeClient = kube.New
}
if !kp.passthroughMode {
kc, err := kubeClient(logger, kp.apiConfig, kp.rules, kp.filters, kp.podAssociations, kp.podIgnore, nil, nil, nil, nil)
kc, err := kubeClient(logger, kp.apiConfig, kp.rules, kp.filters, kp.podAssociations, kp.podIgnore, nil, nil, nil, nil, false)
if err != nil {
return err
}
Expand All @@ -70,19 +70,21 @@ func (kp *kubernetesprocessor) Start(_ context.Context, _ component.Host) error
return nil
}
}
if !kp.passthroughMode {

if kp.kc != nil {
go kp.kc.Start()
}

return nil
}

func (kp *kubernetesprocessor) Shutdown(context.Context) error {
if kp.kc == nil {
return nil
}
if !kp.passthroughMode {
kp.kc.Stop()
}

kp.kc.Stop()

return nil
}

Expand Down
Loading
Loading