Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Agent] Support Node and Service autodiscovery in k8s provider #26801

Merged
merged 25 commits into from
Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,4 @@
- Set `agent.id` to the Fleet Agent ID in events published from inputs backed by Beats. {issue}21121[21121] {pull}26394[26394]
- Enable configuring monitoring namespace {issue}26439[26439]
- Communicate with Fleet Server over HTTP2. {pull}26474[26474]
- Support Node and Service autodiscovery in kubernetes dynamic provider. {pull}26801[26801]
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,72 @@ package kubernetes

import (
"time"

"github.com/elastic/beats/v7/libbeat/logp"
)

// Config for kubernetes provider
type Config struct {
Scope string `config:"scope"`
Resources Resources `config:"resources"`

// expand config settings at root level of config too
KubeConfig string `config:"kube_config"`
Namespace string `config:"namespace"`
SyncPeriod time.Duration `config:"sync_period"`
CleanupTimeout time.Duration `config:"cleanup_timeout" validate:"positive"`

// Needed when resource is a pod
// Needed when resource is a Pod or Node
Node string `config:"node"`
}

// Scope of the provider (cluster or node)
Scope string `config:"scope"`
// Resources config section for resources' config blocks
type Resources struct {
Pod *ResourceConfig `config:"pod"`
Node *ResourceConfig `config:"node"`
Service *ResourceConfig `config:"service"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to have kube_config and node for every type? Is the idea here that you can get pods from one cluster and different nodes from another cluster? Or is this needed for scoped permissions in each kube_config?

This also will make the configuration of the provider change, which is a breaking configuration change.

Now it will be:

providers:
  kubernetes:
    pod:
     node: ${HOSTNAME}
    node:
      node: ${HOSTNAME}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap, I had in mind to provide such kind of flexibility and give the option for possible different permissions per resource. However the generic use case here is with inCluster configs which means that after the improvements of #26947, kube_config and node settings won't be needed in the general use case since defaults would be enough.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we still have a top-level defaults for all of these options? So it works as it does today, but if you want to provide specifics for each resource type you can? That would prevent changing what we have today as it would stay working with the same configurations, as well as provide the flexibility that you are adding here.

}

// ResourceConfig for kubernetes resource
type ResourceConfig struct {
KubeConfig string `config:"kube_config"`
Namespace string `config:"namespace"`
SyncPeriod time.Duration `config:"sync_period"`
CleanupTimeout time.Duration `config:"cleanup_timeout" validate:"positive"`

// Needed when resource is a Pod or Node
Node string `config:"node"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering about use cases for resource specific settings, do you have anyone in mind?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I'm thinking of it again, maybe it is over-engineering to provide this option at the moment since the base config shared for all the resources should cover the cases. Flexibility for different accesses per resource or variant settings options would be nice to think of but we can and see if users actually need them. So, I will change it and move to single config for all of the resources.

}

// InitDefaults initializes the default values for the config.
func (c *Config) InitDefaults() {
c.SyncPeriod = 10 * time.Minute
c.CleanupTimeout = 60 * time.Second
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may be hitting this issue #20543, not sure how we can do something now depending on the kind of data we are collecting.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap, the provider is not really aware of the inputs right now, but maybe it could be handled better in the future if we introduce a "smart" controller which enables providers according to the inputs.

c.SyncPeriod = 10 * time.Minute
c.Scope = "node"
}

// Validate ensures correctness of config
func (c *Config) Validate() error {
// Check if resource is service. If yes then default the scope to "cluster".
if c.Resources.Service != nil {
if c.Scope == "node" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's interesting that you can override almost all settings per resource, but not scope

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logp.L().Warnf("can not set scope to `node` when using resource `Service`. resetting scope to `cluster`")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this logger be namespaced? I also see logger widely used in agent, not sure if there is any preference here

}
c.Scope = "cluster"
}
baseCfg := &ResourceConfig{
CleanupTimeout: c.CleanupTimeout,
SyncPeriod: c.CleanupTimeout,
KubeConfig: c.KubeConfig,
Namespace: c.Namespace,
Node: c.Node,
}
if c.Resources.Pod == nil {
c.Resources.Pod = baseCfg
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the user only overrides resources.pod.namespace? Does that mean that the rest of settings will be empty?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
if c.Resources.Node == nil {
c.Resources.Node = baseCfg
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ package kubernetes

import (
"fmt"
"time"

k8s "k8s.io/client-go/kubernetes"

"github.com/elastic/beats/v7/libbeat/common/kubernetes"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
Expand All @@ -16,10 +17,14 @@ import (
)

const (
// NodePriority is the priority that node mappings are added to the provider.
NodePriority = 0
// PodPriority is the priority that pod mappings are added to the provider.
PodPriority = 0
PodPriority = 1
// ContainerPriority is the priority that container mappings are added to the provider.
ContainerPriority = 1
ContainerPriority = 2
// ServicePriority is the priority that service mappings are added to the provider.
ServicePriority = 3
)

func init() {
Expand All @@ -31,12 +36,6 @@ type dynamicProvider struct {
config *Config
}

type eventWatcher struct {
logger *logger.Logger
cleanupTimeout time.Duration
comm composable.DynamicProviderComm
}

// DynamicProviderBuilder builds the dynamic provider.
func DynamicProviderBuilder(logger *logger.Logger, c *config.Config) (composable.DynamicProvider, error) {
var cfg Config
Expand All @@ -50,172 +49,99 @@ func DynamicProviderBuilder(logger *logger.Logger, c *config.Config) (composable
return &dynamicProvider{logger, &cfg}, nil
}

// Run runs the environment context provider.
// Run runs the kubernetes context provider.
func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
client, err := kubernetes.GetKubernetesClient(p.config.KubeConfig)
if p.config.Resources.Pod != nil {
resourceConfig := p.config.Resources.Pod
err := p.watchResource(comm, "pod", resourceConfig)
if err != nil {
return err
}
}
if p.config.Resources.Node != nil {
resourceConfig := p.config.Resources.Node
err := p.watchResource(comm, "node", resourceConfig)
if err != nil {
return err
}
}
if p.config.Resources.Service != nil {
resourceConfig := p.config.Resources.Service
err := p.watchResource(comm, "service", resourceConfig)
if err != nil {
return err
}
}
return nil
}

// watchResource initializes the proper watcher according to the given resource (pod, node, service)
// and starts watching for such resource's events.
func (p *dynamicProvider) watchResource(
comm composable.DynamicProviderComm,
resourceType string,
resourceConfig *ResourceConfig) error {
client, err := kubernetes.GetKubernetesClient(resourceConfig.KubeConfig)
if err != nil {
// info only; return nil (do nothing)
p.logger.Debugf("Kubernetes provider skipped, unable to connect: %s", err)
p.logger.Debugf("Kubernetes provider for resource %s skipped, unable to connect: %s", resourceType, err)
return nil
}

// Ensure that node is set correctly whenever the scope is set to "node". Make sure that node is empty
// when cluster scope is enforced.
p.logger.Infof("Kubernetes provider started with %s scope", p.config.Scope)
p.logger.Infof("Kubernetes provider started for resource %s with %s scope", resourceType, p.config.Scope)
if p.config.Scope == "node" {
p.logger.Debugf("Initializing Kubernetes watcher using node: %v", p.config.Node)
p.config.Node = kubernetes.DiscoverKubernetesNode(p.logger, p.config.Node, kubernetes.IsInCluster(p.config.KubeConfig), client)
p.logger.Debugf(
"Initializing Kubernetes watcher for resource %s using node: %v",
resourceType,
resourceConfig.Node)
resourceConfig.Node = kubernetes.DiscoverKubernetesNode(
p.logger, resourceConfig.Node,
kubernetes.IsInCluster(resourceConfig.KubeConfig),
client)
} else {
p.config.Node = ""
resourceConfig.Node = ""
}

watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{
SyncTimeout: p.config.SyncPeriod,
Node: p.config.Node,
//Namespace: p.config.Namespace,
}, nil)
watcher, err := p.newWatcher(resourceType, comm, client, resourceConfig)
if err != nil {
return errors.New(err, "couldn't create kubernetes watcher")
return errors.New(err, "couldn't create kubernetes watcher for resource %s", resourceType)
}
watcher.AddEventHandler(&eventWatcher{p.logger, p.config.CleanupTimeout, comm})

err = watcher.Start()
if err != nil {
return errors.New(err, "couldn't start kubernetes watcher")
return errors.New(err, "couldn't start kubernetes watcher for resource %s", resourceType)
}

return nil
}

func (p *eventWatcher) emitRunning(pod *kubernetes.Pod) {
mapping := map[string]interface{}{
"namespace": pod.GetNamespace(),
"pod": map[string]interface{}{
"uid": string(pod.GetUID()),
"name": pod.GetName(),
"labels": pod.GetLabels(),
"ip": pod.Status.PodIP,
},
}

processors := []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"fields": mapping,
"target": "kubernetes",
},
},
}

// Emit the pod
// We emit Pod + containers to ensure that configs matching Pod only
// get Pod metadata (not specific to any container)
p.comm.AddOrUpdate(string(pod.GetUID()), PodPriority, mapping, processors)

// Emit all containers in the pod
p.emitContainers(pod, pod.Spec.Containers, pod.Status.ContainerStatuses)

// TODO deal with init containers stopping after initialization
p.emitContainers(pod, pod.Spec.InitContainers, pod.Status.InitContainerStatuses)
}

func (p *eventWatcher) emitContainers(pod *kubernetes.Pod, containers []kubernetes.Container, containerstatuses []kubernetes.PodContainerStatus) {
// Collect all runtimes from status information.
containerIDs := map[string]string{}
runtimes := map[string]string{}
for _, c := range containerstatuses {
cid, runtime := kubernetes.ContainerIDWithRuntime(c)
containerIDs[c.Name] = cid
runtimes[c.Name] = runtime
}

for _, c := range containers {
// If it doesn't have an ID, container doesn't exist in
// the runtime, emit only an event if we are stopping, so
// we are sure of cleaning up configurations.
cid := containerIDs[c.Name]
if cid == "" {
continue
// newWatcher initializes the proper watcher according to the given resource (pod, node, service).
func (p *dynamicProvider) newWatcher(
resourceType string,
comm composable.DynamicProviderComm,
client k8s.Interface,
config *ResourceConfig) (kubernetes.Watcher, error) {
switch resourceType {
case "pod":
watcher, err := NewPodWatcher(comm, config, p.logger, client, p.config.Scope)
if err != nil {
return nil, err
}

// ID is the combination of pod UID + container name
eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name)

mapping := map[string]interface{}{
"namespace": pod.GetNamespace(),
"pod": map[string]interface{}{
"uid": string(pod.GetUID()),
"name": pod.GetName(),
"labels": pod.GetLabels(),
"ip": pod.Status.PodIP,
},
"container": map[string]interface{}{
"id": cid,
"name": c.Name,
"image": c.Image,
"runtime": runtimes[c.Name],
},
return watcher, nil
case "node":
watcher, err := NewNodeWatcher(comm, config, p.logger, client, p.config.Scope)
if err != nil {
return nil, err
}

processors := []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"fields": mapping,
"target": "kubernetes",
},
},
return watcher, nil
case "service":
watcher, err := NewServiceWatcher(comm, config, p.logger, client, p.config.Scope)
if err != nil {
return nil, err
}

// Emit the container
p.comm.AddOrUpdate(eventID, ContainerPriority, mapping, processors)
return watcher, nil
default:
return nil, fmt.Errorf("unsupported autodiscover resource %s", resourceType)
}
}

func (p *eventWatcher) emitStopped(pod *kubernetes.Pod) {
p.comm.Remove(string(pod.GetUID()))

for _, c := range pod.Spec.Containers {
// ID is the combination of pod UID + container name
eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name)
p.comm.Remove(eventID)
}

for _, c := range pod.Spec.InitContainers {
// ID is the combination of pod UID + container name
eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name)
p.comm.Remove(eventID)
}
}

// OnAdd ensures processing of pod objects that are newly added
func (p *eventWatcher) OnAdd(obj interface{}) {
p.logger.Debugf("pod add: %+v", obj)
p.emitRunning(obj.(*kubernetes.Pod))
}

// OnUpdate emits events for a given pod depending on the state of the pod,
// if it is terminating, a stop event is scheduled, if not, a stop and a start
// events are sent sequentially to recreate the resources assotiated to the pod.
func (p *eventWatcher) OnUpdate(obj interface{}) {
pod := obj.(*kubernetes.Pod)

p.logger.Debugf("pod update for pod: %+v, status: %+v", pod.Name, pod.Status.Phase)
switch pod.Status.Phase {
case kubernetes.PodSucceeded, kubernetes.PodFailed:
time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) })
return
case kubernetes.PodPending:
p.logger.Debugf("pod update (pending): don't know what to do with this pod yet, skipping for now: %+v", obj)
return
}

p.logger.Debugf("pod update: %+v", obj)
p.emitRunning(pod)
}

// OnDelete stops pod objects that are deleted
func (p *eventWatcher) OnDelete(obj interface{}) {
p.logger.Debugf("pod delete: %+v", obj)
pod := obj.(*kubernetes.Pod)
time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) })
}
Loading