From 4363fcc6ddd5a53f578aa52aeeeabfd5c1fb7132 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 26 Mar 2020 10:04:02 -0400 Subject: [PATCH] [BUG] Use Pod.Status.Phase for pod updates in kubernetes autodiscovery (#17223) (#17248) * Use Pod.Status.Phase for pod updates in kubernetes autodiscovery This change fixes several issues with filebeat loosing events when using kubernetes autodiscovery by incorrectly handling of pod states. Switch the pod status verification in OnUpdate() from ObjectMeta.DeletionTimestamp (which is present only for deleted pods) to Pod.Status.Phase in order to correctly handle pod states. ObjectMeta.DeletionTimestamp is only present for deleted pods and when a pod runs to completion (eg. pods generated by conjobs), OnUpdate() will emit a pod stop event disrespecting the CleanupTimeout leading to early termination of running beats. * add issue PR reference to changelog (cherry picked from commit 70237a7b1ae1b36b9309278ee17085ea34cc032b) Co-authored-by: Bruno Moura --- CHANGELOG.next.asciidoc | 1 + .../autodiscover/providers/kubernetes/pod.go | 24 ++++++++----------- libbeat/common/kubernetes/types.go | 13 ++++++++++ 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 2a6c4ab6d7c..b99b221b510 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -71,6 +71,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Update replicaset group to apps/v1 {pull}15854[15802] - Fix `metricbeat test output` with an ipv6 ES host in the output.hosts. {pull}15368[15368] - Fix `convert` processor conversion of string to integer with leading zeros. {issue}15513[15513] {pull}15557[15557] +- Fix Kubernetes autodiscovery provider to correctly handle pod states and avoid missing event data {pull}17223[17223] - Fix `add_cloud_metadata` to better support modifying sub-fields with other processors. {pull}13808[13808] - Fix panic in the Logstash output when trying to send events to closed connection. {pull}15568[15568] - Fix missing output in dockerlogbeat {pull}15719[15719] diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index b7239e3004e..9e3c1974687 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -133,22 +133,18 @@ func (p *pod) OnAdd(obj interface{}) { // events are sent sequentially to recreate the resources assotiated to the pod. func (p *pod) OnUpdate(obj interface{}) { pod := obj.(*kubernetes.Pod) - if pod.GetObjectMeta().GetDeletionTimestamp() != nil { - p.logger.Debugf("Watcher Node update (terminating): %+v", obj) - // Node is terminating, don't reload its configuration and ignore the event - // if some pod is still running, we will receive more events when containers - // terminate. - for _, container := range pod.Status.ContainerStatuses { - if container.State.Running != nil { - return - } - } + + // If Pod is in a phase where all containers in the have terminated emit a stop event + if pod.Status.Phase == kubernetes.PodSucceeded || pod.Status.Phase == kubernetes.PodFailed { + p.logger.Debugf("Watcher Pod update (terminating): %+v", obj) + time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(pod, "stop") }) - } else { - p.logger.Debugf("Watcher Node update: %+v", obj) - p.emit(pod, "stop") - p.emit(pod, "start") + return } + + p.logger.Infof("Watcher Pod update: %+v", obj) + p.emit(pod, "stop") + p.emit(pod, "start") } // GenerateHints creates hints needed for hints builder diff --git a/libbeat/common/kubernetes/types.go b/libbeat/common/kubernetes/types.go index 5648829efe6..b9dcf31e886 100644 --- a/libbeat/common/kubernetes/types.go +++ b/libbeat/common/kubernetes/types.go @@ -72,6 +72,19 @@ type StatefulSet = appsv1.StatefulSet // Service data type Service = v1.Service +const ( + // PodPending phase + PodPending = v1.PodPending + // PodRunning phase + PodRunning = v1.PodRunning + // PodSucceeded phase + PodSucceeded = v1.PodSucceeded + // PodFailed phase + PodFailed = v1.PodFailed + // PodUnknown phase + PodUnknown = v1.PodUnknown +) + // Time extracts time from k8s.Time type func Time(t *metav1.Time) time.Time { return t.Time