Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep docker & k8s pod annotations while they are needed #5084

Merged
merged 1 commit into from
Sep 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Add support for `initContainers` in `add_kubernetes_metadata` processor. {issue}4825[4825]
- Fix the `/usr/bin/beatname` script to accept `-d "*"` as a parameter. {issue}5040[5040]
- Combine `fields.yml` properties when they are defined in different sources. {issue}5075[5075]
- Keep Docker & Kubernetes pod metadata after container dies while they are needed by processors. {pull}5084[5084]
- Fix `fields.yml` lookup when using `export template` with a custom `path.config` param. {issue}5089[5089]

*Auditbeat*
Expand Down Expand Up @@ -77,6 +78,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Don't start filebeat if external modules/prospectors config is wrong and reload is disabled {pull}5053[5053]
- Remove error log from runnerfactory as error is returned by API. {pull}5085[5085]
- Changed the number of shards in the default configuration to 3. {issue}5095[5095]
- Remove error log from runnerfactory as error is returned by API. {pull}5085[5085]

*Heartbeat*

Expand Down
23 changes: 19 additions & 4 deletions libbeat/docs/processors-using.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,19 @@ processors:
lookup_fields: ["metricset.host"]
-------------------------------------------------------------------------------

The `add_kubernetes_metadata` processor has the following configuration settings:

`in_cluster`:: (Optional) Use in cluster settings for Kubernetes client, `true`
by default.
`host`:: (Optional) In case `in_cluster` is false, use this host to connect to
Kubernetes API.
`kube_config`:: (Optional) Use given config file as configuration for Kubernetes
client.
`default_indexers.enabled`:: (Optional) Enable/Disable default pod indexers, in
case you want to specify your own.
`default_matchers.enabled`:: (Optional) Enable/Disable default pod matchers, in
case you want to specify your own.

[[add-docker-metadata]]
=== Add Docker metadata

Expand All @@ -578,10 +591,10 @@ from Docker containers:
processors:
- add_docker_metadata:
host: "unix:///var/run/docker.sock"
match_fields: ["system.process.cgroup.id"]
match_source: true
match_source_index: 4

#match_fields: ["system.process.cgroup.id"]
#match_source: true
#match_source_index: 4
#cleanup_timeout: 60
# To connect to Docker over TLS you must specify a client and CA certificate.
#ssl:
# certificate_authority: "/etc/pki/root/ca.pem"
Expand All @@ -600,3 +613,5 @@ It has the following settings:
`match_source_index`:: (Optional) Index in the source path split by / to look
for container id. It defaults to 4 to match
`/var/lib/docker/containers/<container_id>/*.log`
`cleanup_timeout`:: (Optional) Time of inactivity to consider we can clean and
forget metadata for a container, 60s by default.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ func (d *addDockerMetadata) Run(event *beat.Event) (*beat.Event, error) {
if event.Fields["source"] != nil {
event, err = d.sourceProcessor.Run(event)
if err != nil {
return nil, err
logp.Debug("docker", "Error while extracting container ID from source path: %v", err)
return event, nil
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ func (m *mockWatcher) Start() error {
return nil
}

func (m *mockWatcher) Stop() {}

func (m *mockWatcher) Container(ID string) *Container {
return m.containers[ID]
}
Expand Down
6 changes: 6 additions & 0 deletions libbeat/processors/add_docker_metadata/config.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
package add_docker_metadata

import "time"

// Config for docker processor
type Config struct {
Host string `config:"host"`
TLS *TLSConfig `config:"ssl"`
Fields []string `config:"match_fields"`
MatchSource bool `config:"match_source"`
SourceIndex int `config:"match_source_index"`

// Annotations are kept after container is killled, until they haven't been accessed
// for a full `cleanup_timeout`:
CleanupTimeout time.Duration `config:"cleanup_timeout"`
}

// TLSConfig for docker socket connection
Expand Down
111 changes: 100 additions & 11 deletions libbeat/processors/add_docker_metadata/watcher.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package add_docker_metadata

import (
"context"
"fmt"
"net/http"
"sync"
"time"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
"github.com/docker/go-connections/tlsconfig"
"golang.org/x/net/context"

"github.com/elastic/beats/libbeat/logp"
)
Expand All @@ -22,6 +24,9 @@ type Watcher interface {
// Start watching docker API for new containers
Start() error

// Stop watching docker API for new containers
Stop()

// Container returns the running container with the given ID or nil if unknown
Container(ID string) *Container

Expand All @@ -30,11 +35,15 @@ type Watcher interface {
}

type watcher struct {
client *client.Client
sync.RWMutex
client Client
ctx context.Context
stop context.CancelFunc
containers map[string]*Container
deleted map[string]time.Time // deleted annotations key -> last access time
cleanupTimeout time.Duration
lastValidTimestamp int64
stopped sync.WaitGroup
}

// Container info retrieved by the watcher
Expand All @@ -45,6 +54,12 @@ type Container struct {
Labels map[string]string
}

// Client for docker interface
type Client interface {
ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error)
Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error)
}

type WatcherConstructor func(host string, tls *TLSConfig) (Watcher, error)

// NewWatcher returns a watcher running for the given settings
Expand All @@ -69,28 +84,51 @@ func NewWatcher(host string, tls *TLSConfig) (Watcher, error) {
}
}

cli, err := client.NewClient(host, dockerAPIVersion, httpClient, nil)
client, err := client.NewClient(host, dockerAPIVersion, httpClient, nil)
if err != nil {
return nil, err
}

return NewWatcherWithClient(client, 60*time.Second)
}

func NewWatcherWithClient(client Client, cleanupTimeout time.Duration) (*watcher, error) {
ctx, cancel := context.WithCancel(context.Background())
return &watcher{
client: cli,
ctx: ctx,
stop: cancel,
containers: make(map[string]*Container),
client: client,
ctx: ctx,
stop: cancel,
containers: make(map[string]*Container),
deleted: make(map[string]time.Time),
cleanupTimeout: cleanupTimeout,
}, nil
}

// Container returns the running container with the given ID or nil if unknown
func (w *watcher) Container(ID string) *Container {
return w.containers[ID]
w.RLock()
container := w.containers[ID]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm aware the code was here before, but should we check here if ok if the ID actually exists?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We get nil if the container is not there, and we treat that case

w.RUnlock()

// Update last access time if it's deleted
if _, ok := w.deleted[ID]; ok {
w.Lock()
w.deleted[ID] = time.Now()
w.Unlock()
}

return container
}

// Containers returns the list of known containers
func (w *watcher) Containers() map[string]*Container {
return w.containers
w.RLock()
defer w.RUnlock()
res := make(map[string]*Container)
for k, v := range w.containers {
res[k] = v
}
return res
}

// Start watching docker API for new containers
Expand All @@ -99,6 +137,8 @@ func (w *watcher) Start() error {
logp.Debug("docker", "Start docker containers scanner")
w.lastValidTimestamp = time.Now().Unix()

w.Lock()
defer w.Unlock()
containers, err := w.client.ContainerList(w.ctx, types.ContainerListOptions{})
if err != nil {
return err
Expand All @@ -113,11 +153,17 @@ func (w *watcher) Start() error {
}
}

w.stopped.Add(2)
go w.watch()
go w.cleanupWorker()

return nil
}

func (w *watcher) Stop() {
w.stop()
}

func (w *watcher) watch() {
filters := filters.NewArgs()
filters.Add("type", "container")
Expand All @@ -138,22 +184,30 @@ func (w *watcher) watch() {
w.lastValidTimestamp = event.Time

// Add / update
if event.Action == "create" || event.Action == "update" {
if event.Action == "start" || event.Action == "update" {
name := event.Actor.Attributes["name"]
image := event.Actor.Attributes["image"]
delete(event.Actor.Attributes, "name")
delete(event.Actor.Attributes, "image")

w.Lock()
w.containers[event.Actor.ID] = &Container{
ID: event.Actor.ID,
Name: name,
Image: image,
Labels: event.Actor.Attributes,
}

// un-delete if it's flagged (in case of update or recreation)
delete(w.deleted, event.Actor.ID)
w.Unlock()
}

// Delete
if event.Action == "die" || event.Action == "kill" {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a container dies and gets started again it will still time out eventually and lose the metadata.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that case is treated, as onPodAdd (also called in case of update) cleans any deleted flag: https://github.com/elastic/beats/pull/5084/files#diff-c211c9481fb346b82d8713e779e0be9fR141

Copy link
Contributor Author

@exekias exekias Sep 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry you are right, I was talking about the Kubernetes watcher, not Docker, will apply the same fix, thanks!

I'm also working on tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pushed b54187e

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I restart a container, the events are: kill -> die -> stop -> start -> restart. So the container will not be un-deleted in this case.

Looking at it, adding the metadata on "created" and removing on "die" does seem inconsistent to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me do some tests, looks like it would be better to add it on start, isn't it?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adding on start is the better approach.

delete(w.containers, event.Actor.ID)
w.Lock()
w.deleted[event.Actor.ID] = time.Now()
w.Unlock()
}

case err := <-errors:
Expand All @@ -164,8 +218,43 @@ func (w *watcher) watch() {

case <-w.ctx.Done():
logp.Debug("docker", "Watcher stopped")
w.stopped.Done()
return
}
}
}
}

// Clean up deleted containers after they are not used anymore
func (w *watcher) cleanupWorker() {
for {
// Wait a full period
time.Sleep(w.cleanupTimeout)

select {
case <-w.ctx.Done():
w.stopped.Done()
return
default:
// Check entries for timeout
var toDelete []string
timeout := time.Now().Add(-w.cleanupTimeout)
w.RLock()
for key, lastSeen := range w.deleted {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add some debug logging to this? Otherwise very hard to figure out what is still in memory and what was removed.

if lastSeen.Before(timeout) {
logp.Debug("docker", "Removing container %s after cool down timeout")
toDelete = append(toDelete, key)
}
}
w.RUnlock()

// Delete timed out entries:
w.Lock()
for _, key := range toDelete {
delete(w.deleted, key)
delete(w.containers, key)
}
w.Unlock()
}
}
}
Loading