Skip to content

Commit

Permalink
Fix leaks with metadata processors (elastic#16349)
Browse files Browse the repository at this point in the history
Add a closer interface for processors so their resources can be released
when the processor is not needed anymore.
Explicitly close publisher pipelines so their processors are closed.
Add closers for add_docker_metadata, add_kubernetes_metadata and add_process_metadata.
Script processor will fail if a processor that needs to be closed is used.

(cherry picked from commit a3fe796)
  • Loading branch information
jsoriano committed Oct 13, 2020
1 parent 858025d commit fd39818
Show file tree
Hide file tree
Showing 20 changed files with 427 additions and 38 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ field. You can revert this change by configuring tags for the module and omittin
- Explicitly detect missing variables in autodiscover configuration, log them at the debug level. {issue}20568[20568] {pull}20898[20898]
- Fix `libbeat.output.write.bytes` and `libbeat.output.read.bytes` metrics of the Elasticsearch output. {issue}20752[20752] {pull}21197[21197]
- The `o365input` and `o365` module now recover from an authentication problem or other fatal errors, instead of terminating. {pull}21258[21258]
- The `o365input` and `o365` module now recover from an authentication problem or other fatal errors, instead of terminating. {pull}21259[21258]
- Orderly close processors when processing pipelines are not needed anymore to release their resources. {pull}16349[16349]

*Auditbeat*

Expand Down
1 change: 1 addition & 0 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ type ClientEventer interface {

type ProcessorList interface {
Processor
Close() error
All() []Processor
}

Expand Down
5 changes: 5 additions & 0 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,11 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {
if err != nil {
return err
}
defer func() {
if err := b.processing.Close(); err != nil {
logp.Warn("Failed to close global processing: %v", err)
}
}()

// Windows: Mark service as stopped.
// After this is run, a Beat service is considered by the OS to be stopped
Expand Down
7 changes: 3 additions & 4 deletions libbeat/common/docker/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func NewWatcher(log *logp.Logger, host string, tls *TLSConfig, storeShortID bool
// Extra check to confirm that Docker is available
_, err = client.Info(context.Background())
if err != nil {
client.Close()
return nil, err
}

Expand Down Expand Up @@ -395,14 +396,12 @@ func (w *watcher) cleanupWorker() {
log := w.log

for {
// Wait a full period
time.Sleep(w.cleanupTimeout)

select {
case <-w.ctx.Done():
w.stopped.Done()
return
default:
// Wait a full period
case <-time.After(w.cleanupTimeout):
// Check entries for timeout
var toDelete []string
timeout := time.Now().Add(-w.cleanupTimeout)
Expand Down
12 changes: 12 additions & 0 deletions libbeat/processors/add_docker_metadata/add_docker_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,18 @@ func (d *addDockerMetadata) Run(event *beat.Event) (*beat.Event, error) {
return event, nil
}

func (d *addDockerMetadata) Close() error {
if d.cgroups != nil {
d.cgroups.StopJanitor()
}
d.watcher.Stop()
err := processors.Close(d.sourceProcessor)
if err != nil {
return errors.Wrap(err, "closing source processor of add_docker_metadata")
}
return nil
}

func (d *addDockerMetadata) String() string {
return fmt.Sprintf("%v=[match_fields=[%v] match_pids=[%v]]",
processorName, strings.Join(d.fields, ", "), strings.Join(d.pidFields, ", "))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build linux darwin windows
// +build integration

package add_docker_metadata

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/docker"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/processors"
dockertest "github.com/elastic/beats/v7/libbeat/tests/docker"
"github.com/elastic/beats/v7/libbeat/tests/resources"
)

func TestAddDockerMetadata(t *testing.T) {
goroutines := resources.NewGoroutinesChecker()
defer goroutines.Check(t)

client, err := docker.NewClient(defaultConfig().Host, nil, nil)
require.NoError(t, err)

// Docker clients can affect the goroutines checker because they keep
// idle keep-alive connections, so we explicitly close them.
// These idle connections in principle wouldn't represent leaks even if
// the client is not explicitly closed because they are eventually closed.
defer client.Close()

// Start a container to have some data to enrich events
testClient, err := dockertest.NewClient()
require.NoError(t, err)
// Explicitly close client to don't affect goroutines checker
defer testClient.Close()

image := "busybox"
cmd := []string{"sleep", "60"}
labels := map[string]string{"label": "foo"}
id, err := testClient.ContainerStart(image, cmd, labels)
require.NoError(t, err)
defer testClient.ContainerRemove(id)

info, err := testClient.ContainerInspect(id)
require.NoError(t, err)
pid := info.State.Pid

config, err := common.NewConfigFrom(map[string]interface{}{
"match_fields": []string{"cid"},
})
watcherConstructor := newWatcherWith(client)
processor, err := buildDockerMetadataProcessor(logp.L(), config, watcherConstructor)
require.NoError(t, err)

t.Run("match container by container id", func(t *testing.T) {
input := &beat.Event{Fields: common.MapStr{
"cid": id,
}}
result, err := processor.Run(input)
require.NoError(t, err)

resultLabels, _ := result.Fields.GetValue("container.labels")
expectedLabels := common.MapStr{"label": "foo"}
assert.Equal(t, expectedLabels, resultLabels)
assert.Equal(t, id, result.Fields["cid"])
})

t.Run("match container by process id", func(t *testing.T) {
input := &beat.Event{Fields: common.MapStr{
"cid": id,
"process.pid": pid,
}}
result, err := processor.Run(input)
require.NoError(t, err)

resultLabels, _ := result.Fields.GetValue("container.labels")
expectedLabels := common.MapStr{"label": "foo"}
assert.Equal(t, expectedLabels, resultLabels)
assert.Equal(t, id, result.Fields["cid"])
})

t.Run("don't enrich non existing container", func(t *testing.T) {
input := &beat.Event{Fields: common.MapStr{
"cid": "notexists",
}}
result, err := processor.Run(input)
require.NoError(t, err)
assert.Equal(t, input.Fields, result.Fields)
})

err = processors.Close(processor)
require.NoError(t, err)
}

func newWatcherWith(client docker.Client) docker.WatcherConstructor {
return func(log *logp.Logger, host string, tls *docker.TLSConfig, storeShortID bool) (docker.Watcher, error) {
return docker.NewWatcherWithClient(log, client, 60*time.Second, storeShortID)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

// +build linux darwin windows
// +build !integration

package add_docker_metadata

Expand Down
32 changes: 24 additions & 8 deletions libbeat/processors/add_kubernetes_metadata/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ type cache struct {
timeout time.Duration
deleted map[string]time.Time // key -> when should this obj be deleted
metadata map[string]common.MapStr
done chan struct{}
}

func newCache(cleanupTimeout time.Duration) *cache {
c := &cache{
timeout: cleanupTimeout,
deleted: make(map[string]time.Time),
metadata: make(map[string]common.MapStr),
done: make(chan struct{}),
}
go c.cleanup()
return c
Expand Down Expand Up @@ -67,15 +69,29 @@ func (c *cache) set(key string, data common.MapStr) {
}

func (c *cache) cleanup() {
ticker := time.Tick(timeout)
for now := range ticker {
c.Lock()
for k, t := range c.deleted {
if now.After(t) {
delete(c.deleted, k)
delete(c.metadata, k)
if timeout <= 0 {
return
}

ticker := time.NewTicker(timeout)
defer ticker.Stop()
for {
select {
case <-c.done:
return
case now := <-ticker.C:
c.Lock()
for k, t := range c.deleted {
if now.After(t) {
delete(c.deleted, k)
delete(c.metadata, k)
}
}
c.Unlock()
}
c.Unlock()
}
}

func (c *cache) stop() {
close(c.done)
}
10 changes: 10 additions & 0 deletions libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,16 @@ func (k *kubernetesAnnotator) Run(event *beat.Event) (*beat.Event, error) {
return event, nil
}

func (k *kubernetesAnnotator) Close() error {
if k.watcher != nil {
k.watcher.Stop()
}
if k.cache != nil {
k.cache.stop()
}
return nil
}

func (k *kubernetesAnnotator) addPod(pod *kubernetes.Pod) {
metadata := k.indexers.GetMetadata(pod)
for _, m := range metadata {
Expand Down
46 changes: 34 additions & 12 deletions libbeat/processors/add_process_metadata/add_process_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ var (
)

type addProcessMetadata struct {
config config
provider processMetadataProvider
cidProvider cidProvider
log *logp.Logger
mappings common.MapStr
config config
provider processMetadataProvider
cgroupsCache *common.Cache
cidProvider cidProvider
log *logp.Logger
mappings common.MapStr
}

type processMetadata struct {
Expand All @@ -81,16 +82,22 @@ type cidProvider interface {
}

func init() {
processors.RegisterPlugin(processorName, New)
processors.RegisterPlugin(processorName, NewWithCache)
jsprocessor.RegisterPlugin("AddProcessMetadata", New)
}

// New constructs a new add_process_metadata processor.
func New(cfg *common.Config) (processors.Processor, error) {
return newProcessMetadataProcessorWithProvider(cfg, &procCache)
return newProcessMetadataProcessorWithProvider(cfg, &procCache, false)
}

func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider processMetadataProvider) (proc processors.Processor, err error) {
// NewWithCache construct a new add_process_metadata processor with cache for container IDs.
// Resulting processor implements `Close()` to release the cache resources.
func NewWithCache(cfg *common.Config) (processors.Processor, error) {
return newProcessMetadataProcessorWithProvider(cfg, &procCache, true)
}

func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider processMetadataProvider, withCache bool) (proc processors.Processor, err error) {
// Logging (each processor instance has a unique ID).
var (
id = int(instanceID.Inc())
Expand Down Expand Up @@ -118,21 +125,25 @@ func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider proces
}
// don't use cgroup.ProcessCgroupPaths to save it from doing the work when container id disabled
if ok := containsValue(mappings, "container.id"); ok {
if config.CgroupCacheExpireTime != 0 {
if withCache && config.CgroupCacheExpireTime != 0 {
p.log.Debug("Initializing cgroup cache")
evictionListener := func(k common.Key, v common.Value) {
p.log.Debugf("Evicted cached cgroups for PID=%v", k)
}

cgroupsCache := common.NewCacheWithRemovalListener(config.CgroupCacheExpireTime, 100, evictionListener)
cgroupsCache.StartJanitor(config.CgroupCacheExpireTime)
p.cidProvider = newCidProvider(config.HostPath, config.CgroupPrefixes, config.CgroupRegex, processCgroupPaths, cgroupsCache)
p.cgroupsCache = common.NewCacheWithRemovalListener(config.CgroupCacheExpireTime, 100, evictionListener)
p.cgroupsCache.StartJanitor(config.CgroupCacheExpireTime)
p.cidProvider = newCidProvider(config.HostPath, config.CgroupPrefixes, config.CgroupRegex, processCgroupPaths, p.cgroupsCache)
} else {
p.cidProvider = newCidProvider(config.HostPath, config.CgroupPrefixes, config.CgroupRegex, processCgroupPaths, nil)
}

}

if withCache {
return &addProcessMetadataCloser{p}, nil
}

return &p, nil
}

Expand Down Expand Up @@ -253,6 +264,17 @@ func (p *addProcessMetadata) getContainerID(pid int) (string, error) {
return cid, nil
}

type addProcessMetadataCloser struct {
addProcessMetadata
}

func (p *addProcessMetadataCloser) Close() error {
if p.addProcessMetadata.cgroupsCache != nil {
p.addProcessMetadata.cgroupsCache.StopJanitor()
}
return nil
}

// String returns the processor representation formatted as a string
func (p *addProcessMetadata) String() string {
return fmt.Sprintf("%v=[match_pids=%v, mappings=%v, ignore_missing=%v, overwrite_fields=%v, restricted_fields=%v, host_path=%v, cgroup_prefixes=%v]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ func TestAddProcessMetadata(t *testing.T) {
t.Fatal(err)
}

proc, err := newProcessMetadataProcessorWithProvider(config, testProcs)
proc, err := newProcessMetadataProcessorWithProvider(config, testProcs, true)
if test.initErr == nil {
if err != nil {
t.Fatal(err)
Expand Down
Loading

0 comments on commit fd39818

Please sign in to comment.