diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index b4f50d532925..b310a9f5970f 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -35,6 +35,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - Add support for `initContainers` in `add_kubernetes_metadata` processor. {issue}4825[4825] - Fix the `/usr/bin/beatname` script to accept `-d "*"` as a parameter. {issue}5040[5040] - Combine `fields.yml` properties when they are defined in different sources. {issue}5075[5075] +- Keep Docker & Kubernetes pod metadata after container dies while they are needed by processors. {pull}5084[5084] - Fix `fields.yml` lookup when using `export template` with a custom `path.config` param. {issue}5089[5089] *Auditbeat* @@ -77,6 +78,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - Don't start filebeat if external modules/prospectors config is wrong and reload is disabled {pull}5053[5053] - Remove error log from runnerfactory as error is returned by API. {pull}5085[5085] - Changed the number of shards in the default configuration to 3. {issue}5095[5095] +- Remove error log from runnerfactory as error is returned by API. {pull}5085[5085] *Heartbeat* diff --git a/libbeat/docs/processors-using.asciidoc b/libbeat/docs/processors-using.asciidoc index 7fc0c14dd0cd..e655cd5e2d9f 100644 --- a/libbeat/docs/processors-using.asciidoc +++ b/libbeat/docs/processors-using.asciidoc @@ -562,6 +562,19 @@ processors: lookup_fields: ["metricset.host"] ------------------------------------------------------------------------------- +The `add_kubernetes_metadata` processor has the following configuration settings: + +`in_cluster`:: (Optional) Use in cluster settings for Kubernetes client, `true` +by default. +`host`:: (Optional) In case `in_cluster` is false, use this host to connect to +Kubernetes API. +`kube_config`:: (Optional) Use given config file as configuration for Kubernetes +client. +`default_indexers.enabled`:: (Optional) Enable/Disable default pod indexers, in +case you want to specify your own. +`default_matchers.enabled`:: (Optional) Enable/Disable default pod matchers, in +case you want to specify your own. + [[add-docker-metadata]] === Add Docker metadata @@ -578,10 +591,10 @@ from Docker containers: processors: - add_docker_metadata: host: "unix:///var/run/docker.sock" - match_fields: ["system.process.cgroup.id"] - match_source: true - match_source_index: 4 - + #match_fields: ["system.process.cgroup.id"] + #match_source: true + #match_source_index: 4 + #cleanup_timeout: 60 # To connect to Docker over TLS you must specify a client and CA certificate. #ssl: # certificate_authority: "/etc/pki/root/ca.pem" @@ -600,3 +613,5 @@ It has the following settings: `match_source_index`:: (Optional) Index in the source path split by / to look for container id. It defaults to 4 to match `/var/lib/docker/containers//*.log` +`cleanup_timeout`:: (Optional) Time of inactivity to consider we can clean and +forget metadata for a container, 60s by default. diff --git a/libbeat/processors/add_docker_metadata/add_docker_metadata.go b/libbeat/processors/add_docker_metadata/add_docker_metadata.go index a90046318e4b..024630150f19 100644 --- a/libbeat/processors/add_docker_metadata/add_docker_metadata.go +++ b/libbeat/processors/add_docker_metadata/add_docker_metadata.go @@ -79,7 +79,8 @@ func (d *addDockerMetadata) Run(event *beat.Event) (*beat.Event, error) { if event.Fields["source"] != nil { event, err = d.sourceProcessor.Run(event) if err != nil { - return nil, err + logp.Debug("docker", "Error while extracting container ID from source path: %v", err) + return event, nil } } } diff --git a/libbeat/processors/add_docker_metadata/add_docker_metadata_test.go b/libbeat/processors/add_docker_metadata/add_docker_metadata_test.go index dea7da0c7382..cebd43ee3af1 100644 --- a/libbeat/processors/add_docker_metadata/add_docker_metadata_test.go +++ b/libbeat/processors/add_docker_metadata/add_docker_metadata_test.go @@ -191,6 +191,8 @@ func (m *mockWatcher) Start() error { return nil } +func (m *mockWatcher) Stop() {} + func (m *mockWatcher) Container(ID string) *Container { return m.containers[ID] } diff --git a/libbeat/processors/add_docker_metadata/config.go b/libbeat/processors/add_docker_metadata/config.go index 0efcef7e6ec7..77fb02b49a52 100644 --- a/libbeat/processors/add_docker_metadata/config.go +++ b/libbeat/processors/add_docker_metadata/config.go @@ -1,5 +1,7 @@ package add_docker_metadata +import "time" + // Config for docker processor type Config struct { Host string `config:"host"` @@ -7,6 +9,10 @@ type Config struct { Fields []string `config:"match_fields"` MatchSource bool `config:"match_source"` SourceIndex int `config:"match_source_index"` + + // Annotations are kept after container is killled, until they haven't been accessed + // for a full `cleanup_timeout`: + CleanupTimeout time.Duration `config:"cleanup_timeout"` } // TLSConfig for docker socket connection diff --git a/libbeat/processors/add_docker_metadata/watcher.go b/libbeat/processors/add_docker_metadata/watcher.go index 607dec7f2ec7..4cfee779bc91 100644 --- a/libbeat/processors/add_docker_metadata/watcher.go +++ b/libbeat/processors/add_docker_metadata/watcher.go @@ -1,15 +1,17 @@ package add_docker_metadata import ( - "context" "fmt" "net/http" + "sync" "time" "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/client" "github.com/docker/go-connections/tlsconfig" + "golang.org/x/net/context" "github.com/elastic/beats/libbeat/logp" ) @@ -22,6 +24,9 @@ type Watcher interface { // Start watching docker API for new containers Start() error + // Stop watching docker API for new containers + Stop() + // Container returns the running container with the given ID or nil if unknown Container(ID string) *Container @@ -30,11 +35,15 @@ type Watcher interface { } type watcher struct { - client *client.Client + sync.RWMutex + client Client ctx context.Context stop context.CancelFunc containers map[string]*Container + deleted map[string]time.Time // deleted annotations key -> last access time + cleanupTimeout time.Duration lastValidTimestamp int64 + stopped sync.WaitGroup } // Container info retrieved by the watcher @@ -45,6 +54,12 @@ type Container struct { Labels map[string]string } +// Client for docker interface +type Client interface { + ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) + Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) +} + type WatcherConstructor func(host string, tls *TLSConfig) (Watcher, error) // NewWatcher returns a watcher running for the given settings @@ -69,28 +84,51 @@ func NewWatcher(host string, tls *TLSConfig) (Watcher, error) { } } - cli, err := client.NewClient(host, dockerAPIVersion, httpClient, nil) + client, err := client.NewClient(host, dockerAPIVersion, httpClient, nil) if err != nil { return nil, err } + return NewWatcherWithClient(client, 60*time.Second) +} + +func NewWatcherWithClient(client Client, cleanupTimeout time.Duration) (*watcher, error) { ctx, cancel := context.WithCancel(context.Background()) return &watcher{ - client: cli, - ctx: ctx, - stop: cancel, - containers: make(map[string]*Container), + client: client, + ctx: ctx, + stop: cancel, + containers: make(map[string]*Container), + deleted: make(map[string]time.Time), + cleanupTimeout: cleanupTimeout, }, nil } // Container returns the running container with the given ID or nil if unknown func (w *watcher) Container(ID string) *Container { - return w.containers[ID] + w.RLock() + container := w.containers[ID] + w.RUnlock() + + // Update last access time if it's deleted + if _, ok := w.deleted[ID]; ok { + w.Lock() + w.deleted[ID] = time.Now() + w.Unlock() + } + + return container } // Containers returns the list of known containers func (w *watcher) Containers() map[string]*Container { - return w.containers + w.RLock() + defer w.RUnlock() + res := make(map[string]*Container) + for k, v := range w.containers { + res[k] = v + } + return res } // Start watching docker API for new containers @@ -99,6 +137,8 @@ func (w *watcher) Start() error { logp.Debug("docker", "Start docker containers scanner") w.lastValidTimestamp = time.Now().Unix() + w.Lock() + defer w.Unlock() containers, err := w.client.ContainerList(w.ctx, types.ContainerListOptions{}) if err != nil { return err @@ -113,11 +153,17 @@ func (w *watcher) Start() error { } } + w.stopped.Add(2) go w.watch() + go w.cleanupWorker() return nil } +func (w *watcher) Stop() { + w.stop() +} + func (w *watcher) watch() { filters := filters.NewArgs() filters.Add("type", "container") @@ -138,22 +184,30 @@ func (w *watcher) watch() { w.lastValidTimestamp = event.Time // Add / update - if event.Action == "create" || event.Action == "update" { + if event.Action == "start" || event.Action == "update" { name := event.Actor.Attributes["name"] image := event.Actor.Attributes["image"] delete(event.Actor.Attributes, "name") delete(event.Actor.Attributes, "image") + + w.Lock() w.containers[event.Actor.ID] = &Container{ ID: event.Actor.ID, Name: name, Image: image, Labels: event.Actor.Attributes, } + + // un-delete if it's flagged (in case of update or recreation) + delete(w.deleted, event.Actor.ID) + w.Unlock() } // Delete if event.Action == "die" || event.Action == "kill" { - delete(w.containers, event.Actor.ID) + w.Lock() + w.deleted[event.Actor.ID] = time.Now() + w.Unlock() } case err := <-errors: @@ -164,8 +218,43 @@ func (w *watcher) watch() { case <-w.ctx.Done(): logp.Debug("docker", "Watcher stopped") + w.stopped.Done() return } } } } + +// Clean up deleted containers after they are not used anymore +func (w *watcher) cleanupWorker() { + for { + // Wait a full period + time.Sleep(w.cleanupTimeout) + + select { + case <-w.ctx.Done(): + w.stopped.Done() + return + default: + // Check entries for timeout + var toDelete []string + timeout := time.Now().Add(-w.cleanupTimeout) + w.RLock() + for key, lastSeen := range w.deleted { + if lastSeen.Before(timeout) { + logp.Debug("docker", "Removing container %s after cool down timeout") + toDelete = append(toDelete, key) + } + } + w.RUnlock() + + // Delete timed out entries: + w.Lock() + for _, key := range toDelete { + delete(w.deleted, key) + delete(w.containers, key) + } + w.Unlock() + } + } +} diff --git a/libbeat/processors/add_docker_metadata/watcher_test.go b/libbeat/processors/add_docker_metadata/watcher_test.go new file mode 100644 index 000000000000..3508722649a5 --- /dev/null +++ b/libbeat/processors/add_docker_metadata/watcher_test.go @@ -0,0 +1,212 @@ +package add_docker_metadata + +import ( + "testing" + "time" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/events" + "github.com/stretchr/testify/assert" + "golang.org/x/net/context" +) + +type MockClient struct { + // containers to return on ContainerList call + containers []types.Container + // event list to send on Events call + events []interface{} + + done chan interface{} +} + +func (m *MockClient) ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) { + return m.containers, nil +} + +func (m *MockClient) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) { + eventsC := make(chan events.Message) + errorsC := make(chan error) + + go func() { + for _, event := range m.events { + switch e := event.(type) { + case events.Message: + eventsC <- e + case error: + errorsC <- e + } + } + close(m.done) + }() + + return eventsC, errorsC +} + +func TestWatcherInitialization(t *testing.T) { + watcher := runWatcher(t, true, + []types.Container{ + types.Container{ + ID: "0332dbd79e20", + Names: []string{"/containername", "othername"}, + Image: "busybox", + Labels: map[string]string{"foo": "bar"}, + }, + types.Container{ + ID: "6ac6ee8df5d4", + Names: []string{"/other"}, + Image: "nginx", + Labels: map[string]string{}, + }, + }, + nil) + + assert.Equal(t, watcher.Containers(), map[string]*Container{ + "0332dbd79e20": &Container{ + ID: "0332dbd79e20", + Name: "containername", + Image: "busybox", + Labels: map[string]string{"foo": "bar"}, + }, + "6ac6ee8df5d4": &Container{ + ID: "6ac6ee8df5d4", + Name: "other", + Image: "nginx", + Labels: map[string]string{}, + }, + }) +} + +func TestWatcherAddEvents(t *testing.T) { + watcher := runWatcher(t, true, + []types.Container{ + types.Container{ + ID: "0332dbd79e20", + Names: []string{"/containername", "othername"}, + Image: "busybox", + Labels: map[string]string{"foo": "bar"}, + }, + }, + []interface{}{ + events.Message{ + Action: "start", + Actor: events.Actor{ + ID: "6ac6ee8df5d4", + Attributes: map[string]string{ + "name": "other", + "image": "nginx", + "label": "value", + }, + }, + }, + }, + ) + + assert.Equal(t, watcher.Containers(), map[string]*Container{ + "0332dbd79e20": &Container{ + ID: "0332dbd79e20", + Name: "containername", + Image: "busybox", + Labels: map[string]string{"foo": "bar"}, + }, + "6ac6ee8df5d4": &Container{ + ID: "6ac6ee8df5d4", + Name: "other", + Image: "nginx", + Labels: map[string]string{"label": "value"}, + }, + }) +} + +func TestWatcherUpdateEvent(t *testing.T) { + watcher := runWatcher(t, true, + []types.Container{ + types.Container{ + ID: "0332dbd79e20", + Names: []string{"/containername", "othername"}, + Image: "busybox", + Labels: map[string]string{"label": "foo"}, + }, + }, + []interface{}{ + events.Message{ + Action: "update", + Actor: events.Actor{ + ID: "0332dbd79e20", + Attributes: map[string]string{ + "name": "containername", + "image": "busybox", + "label": "bar", + }, + }, + }, + }, + ) + + assert.Equal(t, watcher.Containers(), map[string]*Container{ + "0332dbd79e20": &Container{ + ID: "0332dbd79e20", + Name: "containername", + Image: "busybox", + Labels: map[string]string{"label": "bar"}, + }, + }) + assert.Equal(t, len(watcher.deleted), 0) +} + +func TestWatcherKill(t *testing.T) { + watcher := runWatcher(t, false, + []types.Container{ + types.Container{ + ID: "0332dbd79e20", + Names: []string{"/containername", "othername"}, + Image: "busybox", + Labels: map[string]string{"label": "foo"}, + }, + }, + []interface{}{ + events.Message{ + Action: "kill", + Actor: events.Actor{ + ID: "0332dbd79e20", + }, + }, + }, + ) + + // Check it doesn't get removed while we request meta for the container + for i := 0; i < 18; i++ { + watcher.Container("0332dbd79e20") + assert.Equal(t, len(watcher.Containers()), 1) + time.Sleep(50 * time.Millisecond) + } + + // Now it should get removed + time.Sleep(150 * time.Millisecond) + assert.Equal(t, len(watcher.Containers()), 0) +} + +func runWatcher(t *testing.T, kill bool, containers []types.Container, events []interface{}) *watcher { + client := &MockClient{ + containers: containers, + events: events, + done: make(chan interface{}), + } + + watcher, err := NewWatcherWithClient(client, 100*time.Millisecond) + if err != nil { + t.Fatal(err) + } + + err = watcher.Start() + if err != nil { + t.Fatal(err) + } + + <-client.done + if kill { + watcher.Stop() + watcher.stopped.Wait() + } + + return watcher +} diff --git a/libbeat/processors/add_kubernetes_metadata/config.go b/libbeat/processors/add_kubernetes_metadata/config.go index ec18bf58ca6d..6f1c1f0b8983 100644 --- a/libbeat/processors/add_kubernetes_metadata/config.go +++ b/libbeat/processors/add_kubernetes_metadata/config.go @@ -7,11 +7,14 @@ import ( ) type kubeAnnotatorConfig struct { - InCluster bool `config:"in_cluster"` - KubeConfig string `config:"kube_config"` - Host string `config:"host"` - Namespace string `config:"namespace"` - SyncPeriod time.Duration `config:"sync_period"` + InCluster bool `config:"in_cluster"` + KubeConfig string `config:"kube_config"` + Host string `config:"host"` + Namespace string `config:"namespace"` + SyncPeriod time.Duration `config:"sync_period"` + // Annotations are kept after pod is removed, until they haven't been accessed + // for a full `cleanup_timeout`: + CleanupTimeout time.Duration `config:"cleanup_timeout"` Indexers PluginConfig `config:"indexers"` Matchers PluginConfig `config:"matchers"` DefaultMatchers Enabled `config:"default_matchers"` @@ -31,6 +34,7 @@ func defaultKuberentesAnnotatorConfig() kubeAnnotatorConfig { return kubeAnnotatorConfig{ InCluster: true, SyncPeriod: 1 * time.Second, + CleanupTimeout: 60 * time.Second, Namespace: "kube-system", DefaultMatchers: Enabled{true}, DefaultIndexers: Enabled{true}, diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index c41b65582c65..3be48a46596e 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -175,7 +175,7 @@ func newKubernetesAnnotator(cfg *common.Config) (processors.Processor, error) { logp.Debug("kubernetes", "Using host ", config.Host) logp.Debug("kubernetes", "Initializing watcher") if client != nil { - watcher := NewPodWatcher(client, &indexers, config.SyncPeriod, config.Host) + watcher := NewPodWatcher(client, &indexers, config.SyncPeriod, config.CleanupTimeout, config.Host) if watcher.Run() { return &kubernetesAnnotator{podWatcher: watcher, matchers: &matchers}, nil diff --git a/libbeat/processors/add_kubernetes_metadata/podwatcher.go b/libbeat/processors/add_kubernetes_metadata/podwatcher.go index 2d0ef0e2fca3..db539a52b65b 100644 --- a/libbeat/processors/add_kubernetes_metadata/podwatcher.go +++ b/libbeat/processors/add_kubernetes_metadata/podwatcher.go @@ -17,6 +17,7 @@ import ( type PodWatcher struct { kubeClient *k8s.Client syncPeriod time.Duration + cleanupTimeout time.Duration podQueue chan *corev1.Pod nodeFilter k8s.Option lastResourceVersion string @@ -27,21 +28,21 @@ type PodWatcher struct { } type annotationCache struct { - sync.Mutex + sync.RWMutex annotations map[string]common.MapStr - pods map[string]*Pod // pod uid -> Pod + pods map[string]*Pod // pod uid -> Pod + deleted map[string]time.Time // deleted annotations key -> last access time } -type NodeOption struct{} - // NewPodWatcher initializes the watcher client to provide a local state of // pods from the cluster (filtered to the given host) -func NewPodWatcher(kubeClient *k8s.Client, indexers *Indexers, syncPeriod time.Duration, host string) *PodWatcher { +func NewPodWatcher(kubeClient *k8s.Client, indexers *Indexers, syncPeriod, cleanupTimeout time.Duration, host string) *PodWatcher { ctx, cancel := context.WithCancel(context.Background()) return &PodWatcher{ kubeClient: kubeClient, indexers: indexers, syncPeriod: syncPeriod, + cleanupTimeout: cleanupTimeout, podQueue: make(chan *corev1.Pod, 10), nodeFilter: k8s.QueryParam("fieldSelector", "spec.nodeName="+host), lastResourceVersion: "0", @@ -50,6 +51,7 @@ func NewPodWatcher(kubeClient *k8s.Client, indexers *Indexers, syncPeriod time.D annotationCache: annotationCache{ annotations: make(map[string]common.MapStr), pods: make(map[string]*Pod), + deleted: make(map[string]time.Time), }, } } @@ -102,8 +104,9 @@ func (p *PodWatcher) watchPods() { } func (p *PodWatcher) Run() bool { - // Start pod processing worker: + // Start pod processing & annotations cleanup workers go p.worker() + go p.cleanupWorker() // Make sure that events don't flow into the annotator before informer is fully set up // Sync initial state: @@ -133,6 +136,9 @@ func (p *PodWatcher) onPodAdd(pod *Pod) { for _, m := range metadata { p.annotationCache.annotations[m.Index] = m.Data + + // un-delete if it's flagged (in case of update or recreation) + delete(p.annotationCache.deleted, m.Index) } } @@ -151,8 +157,10 @@ func (p *PodWatcher) onPodDelete(pod *Pod) { delete(p.annotationCache.pods, pod.Metadata.UID) + // Flag all annotations as deleted (they will be still available for a while) + now := time.Now() for _, index := range p.indexers.GetIndexes(pod) { - delete(p.annotationCache.annotations, index) + p.annotationCache.deleted[index] = now } } @@ -189,18 +197,65 @@ func (p *PodWatcher) worker() { } } +// Check annotations flagged as deleted for their last access time, fully delete +// the ones older than p.cleanupTimeout +func (p *PodWatcher) cleanupWorker() { + for { + // Wait a full period + time.Sleep(p.cleanupTimeout) + + select { + case <-p.ctx.Done(): + return + default: + // Check entries for timeout + var toDelete []string + timeout := time.Now().Add(-p.cleanupTimeout) + p.annotationCache.RLock() + for key, lastSeen := range p.annotationCache.deleted { + if lastSeen.Before(timeout) { + toDelete = append(toDelete, key) + } + } + p.annotationCache.RUnlock() + + // Delete timed out entries: + p.annotationCache.Lock() + for _, key := range toDelete { + delete(p.annotationCache.deleted, key) + delete(p.annotationCache.annotations, key) + } + p.annotationCache.Unlock() + } + } +} + func (p *PodWatcher) GetMetaData(arg string) common.MapStr { - p.annotationCache.Lock() - defer p.annotationCache.Unlock() - if meta, ok := p.annotationCache.annotations[arg]; ok { + p.annotationCache.RLock() + meta, ok := p.annotationCache.annotations[arg] + var deleted bool + if ok { + _, deleted = p.annotationCache.deleted[arg] + } + p.annotationCache.RUnlock() + + // Update deleted last access + if deleted { + p.annotationCache.Lock() + p.annotationCache.deleted[arg] = time.Now() + p.annotationCache.Unlock() + } + + if ok { return meta } + return nil } func (p *PodWatcher) GetPod(uid string) *Pod { - p.annotationCache.Lock() - defer p.annotationCache.Unlock() + p.annotationCache.RLock() + defer p.annotationCache.RUnlock() return p.annotationCache.pods[uid] }