From 7d3314e952622140ca1b04b993ab8664b1e82893 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20P=C3=A9rez-Aradros=20Herce?= Date: Sat, 2 Sep 2017 01:12:01 +0200 Subject: [PATCH] Keep docker & k8s pod annotations while they are needed In some cases pod annotations are neede after the pod is deleted, for instance when filebeat is reading the log behind the container. This change makes sure we keep metadata after a pod is gone. By storing access times we ensure that it's available as long as it's being used --- CHANGELOG.asciidoc | 2 + libbeat/docs/processors-using.asciidoc | 23 +- .../add_docker_metadata.go | 3 +- .../add_docker_metadata_test.go | 2 + .../processors/add_docker_metadata/config.go | 6 + .../processors/add_docker_metadata/watcher.go | 111 ++++++++- .../add_docker_metadata/watcher_test.go | 212 ++++++++++++++++++ .../add_kubernetes_metadata/config.go | 14 +- .../add_kubernetes_metadata/kubernetes.go | 2 +- .../add_kubernetes_metadata/podwatcher.go | 79 ++++++- 10 files changed, 420 insertions(+), 34 deletions(-) create mode 100644 libbeat/processors/add_docker_metadata/watcher_test.go diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index b4f50d532925..b310a9f5970f 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -35,6 +35,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - Add support for `initContainers` in `add_kubernetes_metadata` processor. {issue}4825[4825] - Fix the `/usr/bin/beatname` script to accept `-d "*"` as a parameter. {issue}5040[5040] - Combine `fields.yml` properties when they are defined in different sources. {issue}5075[5075] +- Keep Docker & Kubernetes pod metadata after container dies while they are needed by processors. {pull}5084[5084] - Fix `fields.yml` lookup when using `export template` with a custom `path.config` param. {issue}5089[5089] *Auditbeat* @@ -77,6 +78,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - Don't start filebeat if external modules/prospectors config is wrong and reload is disabled {pull}5053[5053] - Remove error log from runnerfactory as error is returned by API. {pull}5085[5085] - Changed the number of shards in the default configuration to 3. {issue}5095[5095] +- Remove error log from runnerfactory as error is returned by API. {pull}5085[5085] *Heartbeat* diff --git a/libbeat/docs/processors-using.asciidoc b/libbeat/docs/processors-using.asciidoc index 7fc0c14dd0cd..e655cd5e2d9f 100644 --- a/libbeat/docs/processors-using.asciidoc +++ b/libbeat/docs/processors-using.asciidoc @@ -562,6 +562,19 @@ processors: lookup_fields: ["metricset.host"] ------------------------------------------------------------------------------- +The `add_kubernetes_metadata` processor has the following configuration settings: + +`in_cluster`:: (Optional) Use in cluster settings for Kubernetes client, `true` +by default. +`host`:: (Optional) In case `in_cluster` is false, use this host to connect to +Kubernetes API. +`kube_config`:: (Optional) Use given config file as configuration for Kubernetes +client. +`default_indexers.enabled`:: (Optional) Enable/Disable default pod indexers, in +case you want to specify your own. +`default_matchers.enabled`:: (Optional) Enable/Disable default pod matchers, in +case you want to specify your own. + [[add-docker-metadata]] === Add Docker metadata @@ -578,10 +591,10 @@ from Docker containers: processors: - add_docker_metadata: host: "unix:///var/run/docker.sock" - match_fields: ["system.process.cgroup.id"] - match_source: true - match_source_index: 4 - + #match_fields: ["system.process.cgroup.id"] + #match_source: true + #match_source_index: 4 + #cleanup_timeout: 60 # To connect to Docker over TLS you must specify a client and CA certificate. #ssl: # certificate_authority: "/etc/pki/root/ca.pem" @@ -600,3 +613,5 @@ It has the following settings: `match_source_index`:: (Optional) Index in the source path split by / to look for container id. It defaults to 4 to match `/var/lib/docker/containers//*.log` +`cleanup_timeout`:: (Optional) Time of inactivity to consider we can clean and +forget metadata for a container, 60s by default. diff --git a/libbeat/processors/add_docker_metadata/add_docker_metadata.go b/libbeat/processors/add_docker_metadata/add_docker_metadata.go index a90046318e4b..024630150f19 100644 --- a/libbeat/processors/add_docker_metadata/add_docker_metadata.go +++ b/libbeat/processors/add_docker_metadata/add_docker_metadata.go @@ -79,7 +79,8 @@ func (d *addDockerMetadata) Run(event *beat.Event) (*beat.Event, error) { if event.Fields["source"] != nil { event, err = d.sourceProcessor.Run(event) if err != nil { - return nil, err + logp.Debug("docker", "Error while extracting container ID from source path: %v", err) + return event, nil } } } diff --git a/libbeat/processors/add_docker_metadata/add_docker_metadata_test.go b/libbeat/processors/add_docker_metadata/add_docker_metadata_test.go index dea7da0c7382..cebd43ee3af1 100644 --- a/libbeat/processors/add_docker_metadata/add_docker_metadata_test.go +++ b/libbeat/processors/add_docker_metadata/add_docker_metadata_test.go @@ -191,6 +191,8 @@ func (m *mockWatcher) Start() error { return nil } +func (m *mockWatcher) Stop() {} + func (m *mockWatcher) Container(ID string) *Container { return m.containers[ID] } diff --git a/libbeat/processors/add_docker_metadata/config.go b/libbeat/processors/add_docker_metadata/config.go index 0efcef7e6ec7..77fb02b49a52 100644 --- a/libbeat/processors/add_docker_metadata/config.go +++ b/libbeat/processors/add_docker_metadata/config.go @@ -1,5 +1,7 @@ package add_docker_metadata +import "time" + // Config for docker processor type Config struct { Host string `config:"host"` @@ -7,6 +9,10 @@ type Config struct { Fields []string `config:"match_fields"` MatchSource bool `config:"match_source"` SourceIndex int `config:"match_source_index"` + + // Annotations are kept after container is killled, until they haven't been accessed + // for a full `cleanup_timeout`: + CleanupTimeout time.Duration `config:"cleanup_timeout"` } // TLSConfig for docker socket connection diff --git a/libbeat/processors/add_docker_metadata/watcher.go b/libbeat/processors/add_docker_metadata/watcher.go index 607dec7f2ec7..4cfee779bc91 100644 --- a/libbeat/processors/add_docker_metadata/watcher.go +++ b/libbeat/processors/add_docker_metadata/watcher.go @@ -1,15 +1,17 @@ package add_docker_metadata import ( - "context" "fmt" "net/http" + "sync" "time" "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/client" "github.com/docker/go-connections/tlsconfig" + "golang.org/x/net/context" "github.com/elastic/beats/libbeat/logp" ) @@ -22,6 +24,9 @@ type Watcher interface { // Start watching docker API for new containers Start() error + // Stop watching docker API for new containers + Stop() + // Container returns the running container with the given ID or nil if unknown Container(ID string) *Container @@ -30,11 +35,15 @@ type Watcher interface { } type watcher struct { - client *client.Client + sync.RWMutex + client Client ctx context.Context stop context.CancelFunc containers map[string]*Container + deleted map[string]time.Time // deleted annotations key -> last access time + cleanupTimeout time.Duration lastValidTimestamp int64 + stopped sync.WaitGroup } // Container info retrieved by the watcher @@ -45,6 +54,12 @@ type Container struct { Labels map[string]string } +// Client for docker interface +type Client interface { + ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) + Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) +} + type WatcherConstructor func(host string, tls *TLSConfig) (Watcher, error) // NewWatcher returns a watcher running for the given settings @@ -69,28 +84,51 @@ func NewWatcher(host string, tls *TLSConfig) (Watcher, error) { } } - cli, err := client.NewClient(host, dockerAPIVersion, httpClient, nil) + client, err := client.NewClient(host, dockerAPIVersion, httpClient, nil) if err != nil { return nil, err } + return NewWatcherWithClient(client, 60*time.Second) +} + +func NewWatcherWithClient(client Client, cleanupTimeout time.Duration) (*watcher, error) { ctx, cancel := context.WithCancel(context.Background()) return &watcher{ - client: cli, - ctx: ctx, - stop: cancel, - containers: make(map[string]*Container), + client: client, + ctx: ctx, + stop: cancel, + containers: make(map[string]*Container), + deleted: make(map[string]time.Time), + cleanupTimeout: cleanupTimeout, }, nil } // Container returns the running container with the given ID or nil if unknown func (w *watcher) Container(ID string) *Container { - return w.containers[ID] + w.RLock() + container := w.containers[ID] + w.RUnlock() + + // Update last access time if it's deleted + if _, ok := w.deleted[ID]; ok { + w.Lock() + w.deleted[ID] = time.Now() + w.Unlock() + } + + return container } // Containers returns the list of known containers func (w *watcher) Containers() map[string]*Container { - return w.containers + w.RLock() + defer w.RUnlock() + res := make(map[string]*Container) + for k, v := range w.containers { + res[k] = v + } + return res } // Start watching docker API for new containers @@ -99,6 +137,8 @@ func (w *watcher) Start() error { logp.Debug("docker", "Start docker containers scanner") w.lastValidTimestamp = time.Now().Unix() + w.Lock() + defer w.Unlock() containers, err := w.client.ContainerList(w.ctx, types.ContainerListOptions{}) if err != nil { return err @@ -113,11 +153,17 @@ func (w *watcher) Start() error { } } + w.stopped.Add(2) go w.watch() + go w.cleanupWorker() return nil } +func (w *watcher) Stop() { + w.stop() +} + func (w *watcher) watch() { filters := filters.NewArgs() filters.Add("type", "container") @@ -138,22 +184,30 @@ func (w *watcher) watch() { w.lastValidTimestamp = event.Time // Add / update - if event.Action == "create" || event.Action == "update" { + if event.Action == "start" || event.Action == "update" { name := event.Actor.Attributes["name"] image := event.Actor.Attributes["image"] delete(event.Actor.Attributes, "name") delete(event.Actor.Attributes, "image") + + w.Lock() w.containers[event.Actor.ID] = &Container{ ID: event.Actor.ID, Name: name, Image: image, Labels: event.Actor.Attributes, } + + // un-delete if it's flagged (in case of update or recreation) + delete(w.deleted, event.Actor.ID) + w.Unlock() } // Delete if event.Action == "die" || event.Action == "kill" { - delete(w.containers, event.Actor.ID) + w.Lock() + w.deleted[event.Actor.ID] = time.Now() + w.Unlock() } case err := <-errors: @@ -164,8 +218,43 @@ func (w *watcher) watch() { case <-w.ctx.Done(): logp.Debug("docker", "Watcher stopped") + w.stopped.Done() return } } } } + +// Clean up deleted containers after they are not used anymore +func (w *watcher) cleanupWorker() { + for { + // Wait a full period + time.Sleep(w.cleanupTimeout) + + select { + case <-w.ctx.Done(): + w.stopped.Done() + return + default: + // Check entries for timeout + var toDelete []string + timeout := time.Now().Add(-w.cleanupTimeout) + w.RLock() + for key, lastSeen := range w.deleted { + if lastSeen.Before(timeout) { + logp.Debug("docker", "Removing container %s after cool down timeout") + toDelete = append(toDelete, key) + } + } + w.RUnlock() + + // Delete timed out entries: + w.Lock() + for _, key := range toDelete { + delete(w.deleted, key) + delete(w.containers, key) + } + w.Unlock() + } + } +} diff --git a/libbeat/processors/add_docker_metadata/watcher_test.go b/libbeat/processors/add_docker_metadata/watcher_test.go new file mode 100644 index 000000000000..3508722649a5 --- /dev/null +++ b/libbeat/processors/add_docker_metadata/watcher_test.go @@ -0,0 +1,212 @@ +package add_docker_metadata + +import ( + "testing" + "time" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/events" + "github.com/stretchr/testify/assert" + "golang.org/x/net/context" +) + +type MockClient struct { + // containers to return on ContainerList call + containers []types.Container + // event list to send on Events call + events []interface{} + + done chan interface{} +} + +func (m *MockClient) ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) { + return m.containers, nil +} + +func (m *MockClient) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) { + eventsC := make(chan events.Message) + errorsC := make(chan error) + + go func() { + for _, event := range m.events { + switch e := event.(type) { + case events.Message: + eventsC <- e + case error: + errorsC <- e + } + } + close(m.done) + }() + + return eventsC, errorsC +} + +func TestWatcherInitialization(t *testing.T) { + watcher := runWatcher(t, true, + []types.Container{ + types.Container{ + ID: "0332dbd79e20", + Names: []string{"/containername", "othername"}, + Image: "busybox", + Labels: map[string]string{"foo": "bar"}, + }, + types.Container{ + ID: "6ac6ee8df5d4", + Names: []string{"/other"}, + Image: "nginx", + Labels: map[string]string{}, + }, + }, + nil) + + assert.Equal(t, watcher.Containers(), map[string]*Container{ + "0332dbd79e20": &Container{ + ID: "0332dbd79e20", + Name: "containername", + Image: "busybox", + Labels: map[string]string{"foo": "bar"}, + }, + "6ac6ee8df5d4": &Container{ + ID: "6ac6ee8df5d4", + Name: "other", + Image: "nginx", + Labels: map[string]string{}, + }, + }) +} + +func TestWatcherAddEvents(t *testing.T) { + watcher := runWatcher(t, true, + []types.Container{ + types.Container{ + ID: "0332dbd79e20", + Names: []string{"/containername", "othername"}, + Image: "busybox", + Labels: map[string]string{"foo": "bar"}, + }, + }, + []interface{}{ + events.Message{ + Action: "start", + Actor: events.Actor{ + ID: "6ac6ee8df5d4", + Attributes: map[string]string{ + "name": "other", + "image": "nginx", + "label": "value", + }, + }, + }, + }, + ) + + assert.Equal(t, watcher.Containers(), map[string]*Container{ + "0332dbd79e20": &Container{ + ID: "0332dbd79e20", + Name: "containername", + Image: "busybox", + Labels: map[string]string{"foo": "bar"}, + }, + "6ac6ee8df5d4": &Container{ + ID: "6ac6ee8df5d4", + Name: "other", + Image: "nginx", + Labels: map[string]string{"label": "value"}, + }, + }) +} + +func TestWatcherUpdateEvent(t *testing.T) { + watcher := runWatcher(t, true, + []types.Container{ + types.Container{ + ID: "0332dbd79e20", + Names: []string{"/containername", "othername"}, + Image: "busybox", + Labels: map[string]string{"label": "foo"}, + }, + }, + []interface{}{ + events.Message{ + Action: "update", + Actor: events.Actor{ + ID: "0332dbd79e20", + Attributes: map[string]string{ + "name": "containername", + "image": "busybox", + "label": "bar", + }, + }, + }, + }, + ) + + assert.Equal(t, watcher.Containers(), map[string]*Container{ + "0332dbd79e20": &Container{ + ID: "0332dbd79e20", + Name: "containername", + Image: "busybox", + Labels: map[string]string{"label": "bar"}, + }, + }) + assert.Equal(t, len(watcher.deleted), 0) +} + +func TestWatcherKill(t *testing.T) { + watcher := runWatcher(t, false, + []types.Container{ + types.Container{ + ID: "0332dbd79e20", + Names: []string{"/containername", "othername"}, + Image: "busybox", + Labels: map[string]string{"label": "foo"}, + }, + }, + []interface{}{ + events.Message{ + Action: "kill", + Actor: events.Actor{ + ID: "0332dbd79e20", + }, + }, + }, + ) + + // Check it doesn't get removed while we request meta for the container + for i := 0; i < 18; i++ { + watcher.Container("0332dbd79e20") + assert.Equal(t, len(watcher.Containers()), 1) + time.Sleep(50 * time.Millisecond) + } + + // Now it should get removed + time.Sleep(150 * time.Millisecond) + assert.Equal(t, len(watcher.Containers()), 0) +} + +func runWatcher(t *testing.T, kill bool, containers []types.Container, events []interface{}) *watcher { + client := &MockClient{ + containers: containers, + events: events, + done: make(chan interface{}), + } + + watcher, err := NewWatcherWithClient(client, 100*time.Millisecond) + if err != nil { + t.Fatal(err) + } + + err = watcher.Start() + if err != nil { + t.Fatal(err) + } + + <-client.done + if kill { + watcher.Stop() + watcher.stopped.Wait() + } + + return watcher +} diff --git a/libbeat/processors/add_kubernetes_metadata/config.go b/libbeat/processors/add_kubernetes_metadata/config.go index ec18bf58ca6d..6f1c1f0b8983 100644 --- a/libbeat/processors/add_kubernetes_metadata/config.go +++ b/libbeat/processors/add_kubernetes_metadata/config.go @@ -7,11 +7,14 @@ import ( ) type kubeAnnotatorConfig struct { - InCluster bool `config:"in_cluster"` - KubeConfig string `config:"kube_config"` - Host string `config:"host"` - Namespace string `config:"namespace"` - SyncPeriod time.Duration `config:"sync_period"` + InCluster bool `config:"in_cluster"` + KubeConfig string `config:"kube_config"` + Host string `config:"host"` + Namespace string `config:"namespace"` + SyncPeriod time.Duration `config:"sync_period"` + // Annotations are kept after pod is removed, until they haven't been accessed + // for a full `cleanup_timeout`: + CleanupTimeout time.Duration `config:"cleanup_timeout"` Indexers PluginConfig `config:"indexers"` Matchers PluginConfig `config:"matchers"` DefaultMatchers Enabled `config:"default_matchers"` @@ -31,6 +34,7 @@ func defaultKuberentesAnnotatorConfig() kubeAnnotatorConfig { return kubeAnnotatorConfig{ InCluster: true, SyncPeriod: 1 * time.Second, + CleanupTimeout: 60 * time.Second, Namespace: "kube-system", DefaultMatchers: Enabled{true}, DefaultIndexers: Enabled{true}, diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index c41b65582c65..3be48a46596e 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -175,7 +175,7 @@ func newKubernetesAnnotator(cfg *common.Config) (processors.Processor, error) { logp.Debug("kubernetes", "Using host ", config.Host) logp.Debug("kubernetes", "Initializing watcher") if client != nil { - watcher := NewPodWatcher(client, &indexers, config.SyncPeriod, config.Host) + watcher := NewPodWatcher(client, &indexers, config.SyncPeriod, config.CleanupTimeout, config.Host) if watcher.Run() { return &kubernetesAnnotator{podWatcher: watcher, matchers: &matchers}, nil diff --git a/libbeat/processors/add_kubernetes_metadata/podwatcher.go b/libbeat/processors/add_kubernetes_metadata/podwatcher.go index 2d0ef0e2fca3..db539a52b65b 100644 --- a/libbeat/processors/add_kubernetes_metadata/podwatcher.go +++ b/libbeat/processors/add_kubernetes_metadata/podwatcher.go @@ -17,6 +17,7 @@ import ( type PodWatcher struct { kubeClient *k8s.Client syncPeriod time.Duration + cleanupTimeout time.Duration podQueue chan *corev1.Pod nodeFilter k8s.Option lastResourceVersion string @@ -27,21 +28,21 @@ type PodWatcher struct { } type annotationCache struct { - sync.Mutex + sync.RWMutex annotations map[string]common.MapStr - pods map[string]*Pod // pod uid -> Pod + pods map[string]*Pod // pod uid -> Pod + deleted map[string]time.Time // deleted annotations key -> last access time } -type NodeOption struct{} - // NewPodWatcher initializes the watcher client to provide a local state of // pods from the cluster (filtered to the given host) -func NewPodWatcher(kubeClient *k8s.Client, indexers *Indexers, syncPeriod time.Duration, host string) *PodWatcher { +func NewPodWatcher(kubeClient *k8s.Client, indexers *Indexers, syncPeriod, cleanupTimeout time.Duration, host string) *PodWatcher { ctx, cancel := context.WithCancel(context.Background()) return &PodWatcher{ kubeClient: kubeClient, indexers: indexers, syncPeriod: syncPeriod, + cleanupTimeout: cleanupTimeout, podQueue: make(chan *corev1.Pod, 10), nodeFilter: k8s.QueryParam("fieldSelector", "spec.nodeName="+host), lastResourceVersion: "0", @@ -50,6 +51,7 @@ func NewPodWatcher(kubeClient *k8s.Client, indexers *Indexers, syncPeriod time.D annotationCache: annotationCache{ annotations: make(map[string]common.MapStr), pods: make(map[string]*Pod), + deleted: make(map[string]time.Time), }, } } @@ -102,8 +104,9 @@ func (p *PodWatcher) watchPods() { } func (p *PodWatcher) Run() bool { - // Start pod processing worker: + // Start pod processing & annotations cleanup workers go p.worker() + go p.cleanupWorker() // Make sure that events don't flow into the annotator before informer is fully set up // Sync initial state: @@ -133,6 +136,9 @@ func (p *PodWatcher) onPodAdd(pod *Pod) { for _, m := range metadata { p.annotationCache.annotations[m.Index] = m.Data + + // un-delete if it's flagged (in case of update or recreation) + delete(p.annotationCache.deleted, m.Index) } } @@ -151,8 +157,10 @@ func (p *PodWatcher) onPodDelete(pod *Pod) { delete(p.annotationCache.pods, pod.Metadata.UID) + // Flag all annotations as deleted (they will be still available for a while) + now := time.Now() for _, index := range p.indexers.GetIndexes(pod) { - delete(p.annotationCache.annotations, index) + p.annotationCache.deleted[index] = now } } @@ -189,18 +197,65 @@ func (p *PodWatcher) worker() { } } +// Check annotations flagged as deleted for their last access time, fully delete +// the ones older than p.cleanupTimeout +func (p *PodWatcher) cleanupWorker() { + for { + // Wait a full period + time.Sleep(p.cleanupTimeout) + + select { + case <-p.ctx.Done(): + return + default: + // Check entries for timeout + var toDelete []string + timeout := time.Now().Add(-p.cleanupTimeout) + p.annotationCache.RLock() + for key, lastSeen := range p.annotationCache.deleted { + if lastSeen.Before(timeout) { + toDelete = append(toDelete, key) + } + } + p.annotationCache.RUnlock() + + // Delete timed out entries: + p.annotationCache.Lock() + for _, key := range toDelete { + delete(p.annotationCache.deleted, key) + delete(p.annotationCache.annotations, key) + } + p.annotationCache.Unlock() + } + } +} + func (p *PodWatcher) GetMetaData(arg string) common.MapStr { - p.annotationCache.Lock() - defer p.annotationCache.Unlock() - if meta, ok := p.annotationCache.annotations[arg]; ok { + p.annotationCache.RLock() + meta, ok := p.annotationCache.annotations[arg] + var deleted bool + if ok { + _, deleted = p.annotationCache.deleted[arg] + } + p.annotationCache.RUnlock() + + // Update deleted last access + if deleted { + p.annotationCache.Lock() + p.annotationCache.deleted[arg] = time.Now() + p.annotationCache.Unlock() + } + + if ok { return meta } + return nil } func (p *PodWatcher) GetPod(uid string) *Pod { - p.annotationCache.Lock() - defer p.annotationCache.Unlock() + p.annotationCache.RLock() + defer p.annotationCache.RUnlock() return p.annotationCache.pods[uid] }