-
Notifications
You must be signed in to change notification settings - Fork 4.9k
/
Copy pathkubernetes.go
1233 lines (1102 loc) · 43.1 KB
/
kubernetes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package util
import (
"errors"
"fmt"
"strings"
"sync"
"time"
k8sclient "k8s.io/client-go/kubernetes"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
kubernetes2 "github.com/elastic/beats/v7/libbeat/autodiscover/providers/kubernetes"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/elastic-agent-autodiscover/kubernetes"
"github.com/elastic/elastic-agent-autodiscover/kubernetes/metadata"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)
type kubernetesConfig struct {
KubeConfig string `config:"kube_config"`
KubeClientOptions kubernetes.KubeClientOptions `config:"kube_client_options"`
Node string `config:"node"`
SyncPeriod time.Duration `config:"sync_period"`
// AddMetadata enables enriching metricset events with metadata from the API server
AddMetadata bool `config:"add_metadata"`
AddResourceMetadata *metadata.AddResourceMetadataConfig `config:"add_resource_metadata"`
Namespace string `config:"namespace"`
}
// Enricher takes Kubernetes events and enrich them with k8s metadata
type Enricher interface {
// Start will start the Kubernetes watcher on the first call, does nothing on the rest
// errors are logged as warning
Start(*Watchers)
// Stop will stop the Kubernetes watcher
Stop(*Watchers)
// Enrich the given list of events
Enrich([]mapstr.M)
}
type enricher struct {
sync.RWMutex
metadata map[string]mapstr.M
index func(mapstr.M) string
updateFunc func(kubernetes.Resource) map[string]mapstr.M
deleteFunc func(kubernetes.Resource) []string
metricsetName string
resourceName string
isPod bool
config *kubernetesConfig
log *logp.Logger
}
type nilEnricher struct{}
func (*nilEnricher) Start(*Watchers) {}
func (*nilEnricher) Stop(*Watchers) {}
func (*nilEnricher) Enrich([]mapstr.M) {}
type metaWatcher struct {
watcher kubernetes.Watcher // watcher responsible for watching a specific resource
started bool // true if watcher has started, false otherwise
metricsetsUsing []string // list of metricsets using this shared watcher(e.g. pod, container, state_pod)
enrichers map[string]*enricher // map of enrichers using this watcher. The key is the metricset name. Each metricset has its own enricher
metadataObjects map[string]bool // representation of a set of ids(in the form of namespace_name-resource_name) of each object received by the watcher's handler functions
nodeScope bool // whether this watcher should watch for resources in current node or in whole cluster
restartWatcher kubernetes.Watcher // whether this watcher needs a restart. Only relevant in leader nodes due to metricsets with different nodescope(pod, state_pod)
}
type Watchers struct {
metaWatchersMap map[string]*metaWatcher
lock sync.RWMutex
}
const selector = "kubernetes"
const StateMetricsetPrefix = "state_"
const (
PodResource = "pod"
ServiceResource = "service"
DeploymentResource = "deployment"
ReplicaSetResource = "replicaset"
StatefulSetResource = "statefulset"
DaemonSetResource = "daemonset"
JobResource = "job"
NodeResource = "node"
CronJobResource = "cronjob"
PersistentVolumeResource = "persistentvolume"
PersistentVolumeClaimResource = "persistentvolumeclaim"
StorageClassResource = "storageclass"
NamespaceResource = "state_namespace"
)
func NewWatchers() *Watchers {
watchers := &Watchers{
metaWatchersMap: make(map[string]*metaWatcher),
}
return watchers
}
func getResource(resourceName string) kubernetes.Resource {
switch resourceName {
case PodResource:
return &kubernetes.Pod{}
case ServiceResource:
return &kubernetes.Service{}
case DeploymentResource:
return &kubernetes.Deployment{}
case ReplicaSetResource:
return &kubernetes.ReplicaSet{}
case StatefulSetResource:
return &kubernetes.StatefulSet{}
case DaemonSetResource:
return &kubernetes.DaemonSet{}
case JobResource:
return &kubernetes.Job{}
case CronJobResource:
return &kubernetes.CronJob{}
case PersistentVolumeResource:
return &kubernetes.PersistentVolume{}
case PersistentVolumeClaimResource:
return &kubernetes.PersistentVolumeClaim{}
case StorageClassResource:
return &kubernetes.StorageClass{}
case NodeResource:
return &kubernetes.Node{}
case NamespaceResource:
return &kubernetes.Namespace{}
default:
return nil
}
}
// getExtraWatchers returns a list of the extra resources to watch based on some resource.
// The full list can be seen in https://github.com/elastic/beats/issues/37243, at Expected Watchers section.
func getExtraWatchers(resourceName string, addResourceMetadata *metadata.AddResourceMetadataConfig) []string {
switch resourceName {
case PodResource:
extra := []string{}
if addResourceMetadata.Node.Enabled() {
extra = append(extra, NodeResource)
}
if addResourceMetadata.Namespace.Enabled() {
extra = append(extra, NamespaceResource)
}
// We need to create watchers for ReplicaSets and Jobs that it might belong to,
// in order to be able to retrieve 2nd layer Owner metadata like in case of:
// Deployment -> Replicaset -> Pod
// CronJob -> job -> Pod
if addResourceMetadata != nil && addResourceMetadata.Deployment {
extra = append(extra, ReplicaSetResource)
}
if addResourceMetadata != nil && addResourceMetadata.CronJob {
extra = append(extra, JobResource)
}
return extra
case ServiceResource:
extra := []string{}
if addResourceMetadata.Namespace.Enabled() {
extra = append(extra, NamespaceResource)
}
return extra
case DeploymentResource:
extra := []string{}
if addResourceMetadata.Namespace.Enabled() {
extra = append(extra, NamespaceResource)
}
return extra
case ReplicaSetResource:
extra := []string{}
if addResourceMetadata.Namespace.Enabled() {
extra = append(extra, NamespaceResource)
}
return extra
case StatefulSetResource:
extra := []string{}
if addResourceMetadata.Namespace.Enabled() {
extra = append(extra, NamespaceResource)
}
return extra
case DaemonSetResource:
extra := []string{}
if addResourceMetadata.Namespace.Enabled() {
extra = append(extra, NamespaceResource)
}
return extra
case JobResource:
extra := []string{}
if addResourceMetadata.Namespace.Enabled() {
extra = append(extra, NamespaceResource)
}
return extra
case CronJobResource:
extra := []string{}
if addResourceMetadata.Namespace.Enabled() {
extra = append(extra, NamespaceResource)
}
return extra
case PersistentVolumeResource:
return []string{}
case PersistentVolumeClaimResource:
extra := []string{}
if addResourceMetadata.Namespace.Enabled() {
extra = append(extra, NamespaceResource)
}
return extra
case StorageClassResource:
return []string{}
case NodeResource:
return []string{}
case NamespaceResource:
return []string{}
default:
return []string{}
}
}
// getResourceName returns the name of the resource for a metricset.
// Example: state_pod metricset uses pod resource.
// Exception is state_namespace.
func getResourceName(metricsetName string) string {
resourceName := metricsetName
if resourceName != NamespaceResource {
resourceName = strings.ReplaceAll(resourceName, StateMetricsetPrefix, "")
}
return resourceName
}
// getWatchOptions builds the kubernetes.WatchOptions{} needed for the watcher based on the config and nodeScope.
func getWatchOptions(config *kubernetesConfig, nodeScope bool, client k8sclient.Interface, log *logp.Logger) (*kubernetes.WatchOptions, error) {
var err error
options := kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}
// Watch objects in the node only.
if nodeScope {
nd := &kubernetes.DiscoverKubernetesNodeParams{
ConfigHost: config.Node,
Client: client,
IsInCluster: kubernetes.IsInCluster(config.KubeConfig),
HostUtils: &kubernetes.DefaultDiscoveryUtils{},
}
options.Node, err = kubernetes.DiscoverKubernetesNode(log, nd)
if err != nil {
return nil, fmt.Errorf("couldn't discover kubernetes node: %w", err)
}
}
return &options, err
}
func isNamespaced(resourceName string) bool {
if resourceName == NodeResource || resourceName == PersistentVolumeResource || resourceName == StorageClassResource ||
resourceName == NamespaceResource {
return false
}
return true
}
// createWatcher creates a watcher for a specific resource if not already created and stores it in the resourceWatchers map.
// resourceName is the key in the resourceWatchers map where the created watcher gets stored.
// options are the watch options for a specific watcher.
// For example a watcher can be configured through options to watch only for resources on a specific node/namespace or in whole cluster.
// resourceWatchers is the store for all created watchers.
// extraWatcher bool sets apart the watchers that are created as main watcher for a resource and the ones that are created as an extra watcher.
func createWatcher(
resourceName string,
resource kubernetes.Resource,
options kubernetes.WatchOptions,
client k8sclient.Interface,
resourceWatchers *Watchers,
namespace string,
extraWatcher bool) (bool, error) {
// We need to check the node scope to decide on whether a watcher should be updated or not.
nodeScope := false
if options.Node != "" {
nodeScope = true
}
// The nodescope for extra watchers node, namespace, replicaset and job should be always false.
if extraWatcher {
nodeScope = false
options.Node = ""
}
resourceWatchers.lock.Lock()
defer resourceWatchers.lock.Unlock()
// Check if a watcher for the specific resource already exists.
resourceMetaWatcher, ok := resourceWatchers.metaWatchersMap[resourceName]
// If it does not exist, create the resourceMetaWatcher.
if !ok {
// Check if we need to add namespace to the watcher's options.
if isNamespaced(resourceName) {
options.Namespace = namespace
}
watcher, err := kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil)
if err != nil {
return false, err
}
resourceWatchers.metaWatchersMap[resourceName] = &metaWatcher{
watcher: watcher,
started: false, // not started yet
metadataObjects: make(map[string]bool),
enrichers: make(map[string]*enricher),
metricsetsUsing: make([]string, 0),
restartWatcher: nil,
nodeScope: nodeScope,
}
return true, nil
} else if resourceMetaWatcher.nodeScope != nodeScope && resourceMetaWatcher.nodeScope {
// It might happen that the watcher already exists, but is only being used to monitor the resources
// of a single node(e.g. created by pod metricset). In that case, we need to check if we are trying to create a new watcher that will track
// the resources of whole cluster(e.g. in case of state_pod metricset).
// If it is the case, then we need to update the watcher by changing its watch options (removing options.Node)
// A running watcher cannot be updated directly. Instead, we must create a new one with the correct watch options.
// The new restartWatcher must be identical to the old watcher, including the same handler function, with the only difference being the watch options.
if isNamespaced(resourceName) {
options.Namespace = namespace
}
restartWatcher, err := kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil)
if err != nil {
return false, err
}
// update the handler of the restartWatcher to match the current watcher's handler.
restartWatcher.AddEventHandler(resourceMetaWatcher.watcher.GetEventHandler())
resourceMetaWatcher.restartWatcher = restartWatcher
resourceMetaWatcher.nodeScope = nodeScope
}
return false, nil
}
// addToMetricsetsUsing adds metricset identified by metricsetUsing to the list of resources using the shared watcher
// identified by resourceName. The caller of this function should not be holding the lock.
func addToMetricsetsUsing(resourceName string, metricsetUsing string, resourceWatchers *Watchers) {
resourceWatchers.lock.Lock()
defer resourceWatchers.lock.Unlock()
data, ok := resourceWatchers.metaWatchersMap[resourceName]
if ok {
contains := false
for _, which := range data.metricsetsUsing {
if which == metricsetUsing {
contains = true
break
}
}
// add this resource to the list of resources using it
if !contains {
data.metricsetsUsing = append(data.metricsetsUsing, metricsetUsing)
}
}
}
// removeFromMetricsetsUsing removes the metricset from the list of resources using the shared watcher.
// It returns true if element was removed and new size of array.
// The cache should be locked when called.
func removeFromMetricsetsUsing(resourceName string, notUsingName string, resourceWatchers *Watchers) (bool, int) {
data, ok := resourceWatchers.metaWatchersMap[resourceName]
removed := false
if ok {
newIndex := 0
for i, which := range data.metricsetsUsing {
if which == notUsingName {
removed = true
} else {
data.metricsetsUsing[newIndex] = data.metricsetsUsing[i]
newIndex++
}
}
data.metricsetsUsing = data.metricsetsUsing[:newIndex]
return removed, len(data.metricsetsUsing)
}
return removed, 0
}
// createAllWatchers creates all the watchers required by a metricset
func createAllWatchers(
client k8sclient.Interface,
metricsetName string,
resourceName string,
nodeScope bool,
config *kubernetesConfig,
log *logp.Logger,
resourceWatchers *Watchers,
) error {
res := getResource(resourceName)
if res == nil {
return fmt.Errorf("resource for name %s does not exist. Watcher cannot be created", resourceName)
}
options, err := getWatchOptions(config, nodeScope, client, log)
if err != nil {
return err
}
// Create the main watcher for the given resource.
// For example pod metricset's main watcher will be pod watcher.
// If it fails, we return an error, so we can stop the extra watchers from creating.
created, err := createWatcher(resourceName, res, *options, client, resourceWatchers, config.Namespace, false)
if err != nil {
return fmt.Errorf("error initializing Kubernetes watcher %s, required by %s: %w", resourceName, metricsetName, err)
} else if created {
log.Debugf("Created watcher %s successfully, created by %s.", resourceName, metricsetName)
}
// add this metricset to the ones using the watcher
addToMetricsetsUsing(resourceName, metricsetName, resourceWatchers)
// Create any extra watchers required by this resource
// For example pod requires also namespace and node watcher and possibly replicaset and job watcher.
extraWatchers := getExtraWatchers(resourceName, config.AddResourceMetadata)
for _, extra := range extraWatchers {
extraRes := getResource(extra)
if extraRes != nil {
created, err = createWatcher(extra, extraRes, *options, client, resourceWatchers, config.Namespace, true)
if err != nil {
log.Errorf("Error initializing Kubernetes watcher %s, required by %s: %s", extra, metricsetName, err)
} else {
if created {
log.Debugf("Created watcher %s successfully, created by %s.", extra, metricsetName)
}
// add this metricset to the ones using the extra watchers
addToMetricsetsUsing(extra, metricsetName, resourceWatchers)
}
} else {
log.Errorf("Resource for name %s does not exist. Watcher cannot be created.", extra)
}
}
return nil
}
// createMetadataGen creates and returns the metadata generator for resources other than pod and service
// metaGen is a struct of type Resource and implements Generate method for metadata generation for a given resource kind.
func createMetadataGen(client k8sclient.Interface, commonConfig *conf.C, addResourceMetadata *metadata.AddResourceMetadataConfig,
resourceName string, resourceWatchers *Watchers) (*metadata.Resource, error) {
resourceWatchers.lock.RLock()
defer resourceWatchers.lock.RUnlock()
resourceMetaWatcher := resourceWatchers.metaWatchersMap[resourceName]
// This should not be possible since the watchers should have been created before
if resourceMetaWatcher == nil {
return nil, fmt.Errorf("could not create the metadata generator, as the watcher for %s does not exist", resourceName)
}
var metaGen *metadata.Resource
namespaceMetaWatcher := resourceWatchers.metaWatchersMap[NamespaceResource]
if namespaceMetaWatcher != nil {
n := metadata.NewNamespaceMetadataGenerator(addResourceMetadata.Namespace,
(*namespaceMetaWatcher).watcher.Store(), client)
metaGen = metadata.NewNamespaceAwareResourceMetadataGenerator(commonConfig, client, n)
} else {
metaGen = metadata.NewResourceMetadataGenerator(commonConfig, client)
}
return metaGen, nil
}
// createMetadataGenSpecific creates and returns the metadata generator for a specific resource - pod or service
// A metaGen struct implements a MetaGen interface and is designed to utilize the necessary watchers to collect(Generate) metadata for a specific resource.
func createMetadataGenSpecific(client k8sclient.Interface, commonConfig *conf.C, addResourceMetadata *metadata.AddResourceMetadataConfig,
resourceName string, resourceWatchers *Watchers) (metadata.MetaGen, error) {
resourceWatchers.lock.RLock()
defer resourceWatchers.lock.RUnlock()
// The watcher for the resource needs to exist
resourceMetaWatcher := resourceWatchers.metaWatchersMap[resourceName]
if resourceMetaWatcher == nil {
return nil, fmt.Errorf("could not create the metadata generator, as the watcher for %s does not exist", resourceName)
}
mainWatcher := (*resourceMetaWatcher).watcher
if (*resourceMetaWatcher).restartWatcher != nil {
mainWatcher = (*resourceMetaWatcher).restartWatcher
}
var metaGen metadata.MetaGen
if resourceName == PodResource {
var nodeWatcher kubernetes.Watcher
if nodeMetaWatcher := resourceWatchers.metaWatchersMap[NodeResource]; nodeMetaWatcher != nil {
nodeWatcher = (*nodeMetaWatcher).watcher
}
var namespaceWatcher kubernetes.Watcher
if namespaceMetaWatcher := resourceWatchers.metaWatchersMap[NamespaceResource]; namespaceMetaWatcher != nil {
namespaceWatcher = (*namespaceMetaWatcher).watcher
}
var replicaSetWatcher kubernetes.Watcher
if replicasetMetaWatcher := resourceWatchers.metaWatchersMap[ReplicaSetResource]; replicasetMetaWatcher != nil {
replicaSetWatcher = (*replicasetMetaWatcher).watcher
}
var jobWatcher kubernetes.Watcher
if jobMetaWatcher := resourceWatchers.metaWatchersMap[JobResource]; jobMetaWatcher != nil {
jobWatcher = (*jobMetaWatcher).watcher
}
// For example for pod named redis in namespace default, the generator uses the pod watcher for pod metadata,
// collects all node metadata using the node watcher's store and all namespace metadata using the namespacewatcher's store.
metaGen = metadata.GetPodMetaGen(commonConfig, mainWatcher, nodeWatcher, namespaceWatcher, replicaSetWatcher,
jobWatcher, addResourceMetadata)
return metaGen, nil
} else if resourceName == ServiceResource {
namespaceMetaWatcher := resourceWatchers.metaWatchersMap[NamespaceResource]
if namespaceMetaWatcher == nil {
return nil, fmt.Errorf("could not create the metadata generator, as the watcher for namespace does not exist")
}
namespaceMeta := metadata.NewNamespaceMetadataGenerator(addResourceMetadata.Namespace,
(*namespaceMetaWatcher).watcher.Store(), client)
metaGen = metadata.NewServiceMetadataGenerator(commonConfig, (*resourceMetaWatcher).watcher.Store(),
namespaceMeta, client)
return metaGen, nil
}
// Should never reach this part, as this function is only for service or pod resources
return metaGen, fmt.Errorf("failed to create a metadata generator for resource %s", resourceName)
}
// NewResourceMetadataEnricher returns a metadata enricher for a given resource
// For the metadata enrichment, resource watchers are used which are shared between
// the different metricsets. For example for pod metricset, a pod watcher, a namespace and
// node watcher are by default needed in addition to job and replicaset watcher according
// to configuration. These watchers will be also used by other metricsets that require them
// like state_pod, state_container, node etc.
func NewResourceMetadataEnricher(
base mb.BaseMetricSet,
metricsRepo *MetricsRepo,
resourceWatchers *Watchers,
nodeScope bool) Enricher {
log := logp.NewLogger(selector)
// metricset configuration
config, err := GetValidatedConfig(base)
if err != nil {
log.Info("Kubernetes metricset enriching is disabled")
return &nilEnricher{}
}
// This type of config is needed for the metadata generator
// and includes detailed settings for metadata enrichment
commonMetaConfig := metadata.Config{}
if err := base.Module().UnpackConfig(&commonMetaConfig); err != nil {
log.Errorf("Error initializing Kubernetes metadata enricher: %s", err)
return &nilEnricher{}
}
commonConfig, _ := conf.NewConfigFrom(&commonMetaConfig)
client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions)
if err != nil {
log.Errorf("Error creating Kubernetes client: %s", err)
return &nilEnricher{}
}
metricsetName := base.Name()
resourceName := getResourceName(metricsetName)
// Create all watchers needed for this metricset
err = createAllWatchers(client, metricsetName, resourceName, nodeScope, config, log, resourceWatchers)
if err != nil {
log.Errorf("Error starting the watchers: %s", err)
return &nilEnricher{}
}
var specificMetaGen metadata.MetaGen
var generalMetaGen *metadata.Resource
// Create the metadata generator to be used in the watcher's event handler.
// Both specificMetaGen and generalMetaGen implement Generate method for metadata collection.
if resourceName == ServiceResource || resourceName == PodResource {
specificMetaGen, err = createMetadataGenSpecific(client, commonConfig, config.AddResourceMetadata, resourceName, resourceWatchers)
} else {
generalMetaGen, err = createMetadataGen(client, commonConfig, config.AddResourceMetadata, resourceName, resourceWatchers)
}
if err != nil {
log.Errorf("Error trying to create the metadata generators: %s", err)
return &nilEnricher{}
}
// updateFunc to be used as the resource watcher's add and update handler.
// The handler function is executed when a watcher is triggered(i.e. new/updated resource).
// It is responsible for generating the metadata for a detected resource by executing the metadata generator's Generate method.
// It is a common handler for all resource watchers. The kind of resource(e.g. pod or deployment) is checked inside the function.
// It returns a map of a resourse identifier(i.e namespace-resource_name) as key and the metadata as value.
updateFunc := func(r kubernetes.Resource) map[string]mapstr.M {
accessor, _ := meta.Accessor(r)
id := accessor.GetName()
namespace := accessor.GetNamespace()
if namespace != "" {
id = join(namespace, id)
}
switch r := r.(type) {
case *kubernetes.Pod:
return map[string]mapstr.M{id: specificMetaGen.Generate(r)}
case *kubernetes.Node:
nodeName := r.GetObjectMeta().GetName()
metrics := NewNodeMetrics()
if cpu, ok := r.Status.Capacity["cpu"]; ok {
if q, err := resource.ParseQuantity(cpu.String()); err == nil {
metrics.CoresAllocatable = NewFloat64Metric(float64(q.MilliValue()) / 1000)
}
}
if memory, ok := r.Status.Capacity["memory"]; ok {
if q, err := resource.ParseQuantity(memory.String()); err == nil {
metrics.MemoryAllocatable = NewFloat64Metric(float64(q.Value()))
}
}
nodeStore, _ := metricsRepo.AddNodeStore(nodeName)
nodeStore.SetNodeMetrics(metrics)
return map[string]mapstr.M{id: generalMetaGen.Generate(NodeResource, r)}
case *kubernetes.Deployment:
return map[string]mapstr.M{id: generalMetaGen.Generate(DeploymentResource, r)}
case *kubernetes.Job:
return map[string]mapstr.M{id: generalMetaGen.Generate(JobResource, r)}
case *kubernetes.CronJob:
return map[string]mapstr.M{id: generalMetaGen.Generate(CronJobResource, r)}
case *kubernetes.Service:
return map[string]mapstr.M{id: specificMetaGen.Generate(r)}
case *kubernetes.StatefulSet:
return map[string]mapstr.M{id: generalMetaGen.Generate(StatefulSetResource, r)}
case *kubernetes.Namespace:
return map[string]mapstr.M{id: generalMetaGen.Generate(NamespaceResource, r)}
case *kubernetes.ReplicaSet:
return map[string]mapstr.M{id: generalMetaGen.Generate(ReplicaSetResource, r)}
case *kubernetes.DaemonSet:
return map[string]mapstr.M{id: generalMetaGen.Generate(DaemonSetResource, r)}
case *kubernetes.PersistentVolume:
return map[string]mapstr.M{id: generalMetaGen.Generate(PersistentVolumeResource, r)}
case *kubernetes.PersistentVolumeClaim:
return map[string]mapstr.M{id: generalMetaGen.Generate(PersistentVolumeClaimResource, r)}
case *kubernetes.StorageClass:
return map[string]mapstr.M{id: generalMetaGen.Generate(StorageClassResource, r)}
default:
return map[string]mapstr.M{id: generalMetaGen.Generate(r.GetObjectKind().GroupVersionKind().Kind, r)}
}
}
// deleteFunc to be used as the resource watcher's delete handler.
// The deleteFunc is executed when a watcher is triggered for a resource deletion(e.g. pod deleted).
// It returns the identifier of the resource.
deleteFunc := func(r kubernetes.Resource) []string {
accessor, _ := meta.Accessor(r)
switch r := r.(type) {
case *kubernetes.Node:
nodeName := r.GetObjectMeta().GetName()
metricsRepo.DeleteNodeStore(nodeName)
}
id := accessor.GetName()
namespace := accessor.GetNamespace()
if namespace != "" {
id = join(namespace, id)
}
return []string{id}
}
// indexFunc constructs and returns the resource identifier from a given event.
// If a resource is namespaced(e.g. pod) the identifier is in the form of namespace-resource_name.
// If it is not namespaced(e.g. node) the identifier is the resource's name.
indexFunc := func(e mapstr.M) string {
name := getString(e, "name")
namespace := getString(e, mb.ModuleDataKey+".namespace")
id := ""
if name != "" && namespace != "" {
id = join(namespace, name)
} else if namespace != "" {
id = namespace
} else {
id = name
}
return id
}
// create a metadata enricher for this metricset
enricher := buildMetadataEnricher(
metricsetName,
resourceName,
resourceWatchers,
config,
updateFunc,
deleteFunc,
indexFunc,
log)
if resourceName == PodResource {
enricher.isPod = true
}
return enricher
}
// NewContainerMetadataEnricher returns an Enricher configured for container events
func NewContainerMetadataEnricher(
base mb.BaseMetricSet,
metricsRepo *MetricsRepo,
resourceWatchers *Watchers,
nodeScope bool) Enricher {
log := logp.NewLogger(selector)
config, err := GetValidatedConfig(base)
if err != nil {
log.Info("Kubernetes metricset enriching is disabled")
return &nilEnricher{}
}
// This type of config is needed for the metadata generator
commonMetaConfig := metadata.Config{}
if err := base.Module().UnpackConfig(&commonMetaConfig); err != nil {
log.Errorf("Error initializing Kubernetes metadata enricher: %s", err)
return &nilEnricher{}
}
commonConfig, _ := conf.NewConfigFrom(&commonMetaConfig)
client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions)
if err != nil {
log.Errorf("Error creating Kubernetes client: %s", err)
return &nilEnricher{}
}
metricsetName := base.Name()
err = createAllWatchers(client, metricsetName, PodResource, nodeScope, config, log, resourceWatchers)
if err != nil {
log.Errorf("Error starting the watchers: %s", err)
return &nilEnricher{}
}
metaGen, err := createMetadataGenSpecific(client, commonConfig, config.AddResourceMetadata, PodResource, resourceWatchers)
if err != nil {
log.Errorf("Error trying to create the metadata generators: %s", err)
return &nilEnricher{}
}
updateFunc := func(r kubernetes.Resource) map[string]mapstr.M {
metadataEvents := make(map[string]mapstr.M)
pod, ok := r.(*kubernetes.Pod)
if !ok {
base.Logger().Debugf("Error while casting event: %s", ok)
}
pmeta := metaGen.Generate(pod)
statuses := make(map[string]*kubernetes.PodContainerStatus)
mapStatuses := func(s []kubernetes.PodContainerStatus) {
for i := range s {
statuses[s[i].Name] = &s[i]
}
}
mapStatuses(pod.Status.ContainerStatuses)
mapStatuses(pod.Status.InitContainerStatuses)
nodeStore, _ := metricsRepo.AddNodeStore(pod.Spec.NodeName)
podId := NewPodId(pod.Namespace, pod.Name)
podStore, _ := nodeStore.AddPodStore(podId)
for _, container := range append(pod.Spec.Containers, pod.Spec.InitContainers...) {
cmeta := mapstr.M{}
metrics := NewContainerMetrics()
if cpu, ok := container.Resources.Limits["cpu"]; ok {
if q, err := resource.ParseQuantity(cpu.String()); err == nil {
metrics.CoresLimit = NewFloat64Metric(float64(q.MilliValue()) / 1000)
}
}
if memory, ok := container.Resources.Limits["memory"]; ok {
if q, err := resource.ParseQuantity(memory.String()); err == nil {
metrics.MemoryLimit = NewFloat64Metric(float64(q.Value()))
}
}
containerStore, _ := podStore.AddContainerStore(container.Name)
containerStore.SetContainerMetrics(metrics)
if s, ok := statuses[container.Name]; ok {
// Extracting id and runtime ECS fields from ContainerID
// which is in the form of <container.runtime>://<container.id>
split := strings.Index(s.ContainerID, "://")
if split != -1 {
kubernetes2.ShouldPut(cmeta, "container.id", s.ContainerID[split+3:], base.Logger())
kubernetes2.ShouldPut(cmeta, "container.runtime", s.ContainerID[:split], base.Logger())
}
}
id := join(pod.GetObjectMeta().GetNamespace(), pod.GetObjectMeta().GetName(), container.Name)
cmeta.DeepUpdate(pmeta)
metadataEvents[id] = cmeta
}
return metadataEvents
}
deleteFunc := func(r kubernetes.Resource) []string {
ids := make([]string, 0)
pod, ok := r.(*kubernetes.Pod)
if !ok {
base.Logger().Debugf("Error while casting event: %s", ok)
}
podId := NewPodId(pod.Namespace, pod.Name)
nodeStore := metricsRepo.GetNodeStore(pod.Spec.NodeName)
nodeStore.DeletePodStore(podId)
for _, container := range append(pod.Spec.Containers, pod.Spec.InitContainers...) {
id := join(pod.ObjectMeta.GetNamespace(), pod.GetObjectMeta().GetName(), container.Name)
ids = append(ids, id)
}
return ids
}
indexFunc := func(e mapstr.M) string {
return join(getString(e, mb.ModuleDataKey+".namespace"), getString(e, mb.ModuleDataKey+".pod.name"), getString(e, "name"))
}
enricher := buildMetadataEnricher(
metricsetName,
PodResource,
resourceWatchers,
config,
updateFunc,
deleteFunc,
indexFunc,
log,
)
return enricher
}
func GetValidatedConfig(base mb.BaseMetricSet) (*kubernetesConfig, error) {
config, err := GetConfig(base)
if err != nil {
logp.Err("Error while getting config: %v", err)
return nil, err
}
config, err = validateConfig(config)
if err != nil {
logp.Err("Error while validating config: %v", err)
return nil, err
}
return config, nil
}
func validateConfig(config *kubernetesConfig) (*kubernetesConfig, error) {
if !config.AddMetadata {
return nil, errors.New("metadata enriching is disabled")
}
return config, nil
}
func GetConfig(base mb.BaseMetricSet) (*kubernetesConfig, error) {
config := &kubernetesConfig{
AddMetadata: true,
SyncPeriod: time.Minute * 10,
AddResourceMetadata: metadata.GetDefaultResourceMetadataConfig(),
}
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, errors.New("error unpacking configs")
}
return config, nil
}
func getString(m mapstr.M, key string) string {
val, err := m.GetValue(key)
if err != nil {
return ""
}
str, _ := val.(string)
return str
}
func join(fields ...string) string {
return strings.Join(fields, ":")
}
// buildMetadataEnricher builds and returns a metadata enricher for a given metricset.
// It appends the new enricher to the watcher.enrichers map for the given resource watcher.
// It also updates the add, update and delete event handlers of the watcher in order to retrieve
// the metadata of all enrichers associated to that watcher.
func buildMetadataEnricher(
metricsetName string,
resourceName string,
resourceWatchers *Watchers,
config *kubernetesConfig,
updateFunc func(kubernetes.Resource) map[string]mapstr.M,
deleteFunc func(kubernetes.Resource) []string,
indexFunc func(e mapstr.M) string,
log *logp.Logger) *enricher {
enricher := &enricher{
metadata: map[string]mapstr.M{},
index: indexFunc,
updateFunc: updateFunc,
deleteFunc: deleteFunc,
resourceName: resourceName,
metricsetName: metricsetName,
config: config,
log: log,
}
resourceWatchers.lock.Lock()
defer resourceWatchers.lock.Unlock()
// Check if a watcher for this resource already exists.
resourceMetaWatcher := resourceWatchers.metaWatchersMap[resourceName]
if resourceMetaWatcher != nil {
// Append the new enricher to watcher's enrichers map.
resourceMetaWatcher.enrichers[metricsetName] = enricher
// Check if this shared watcher has already detected resources and collected their
// metadata for another enricher.
// In that case, for each resource, call the updateFunc of the current enricher to
// generate its metadata. This is needed in cases where the watcher has already been
// notified for new/updated resources while the enricher for current metricset has not
// built yet (example is pod, state_pod metricsets).
for key := range resourceMetaWatcher.metadataObjects {
obj, exists, err := resourceMetaWatcher.watcher.Store().GetByKey(key)
if err != nil {
log.Errorf("Error trying to get the object from the store: %s", err)
} else {
if exists {
newMetadataEvents := enricher.updateFunc(obj.(kubernetes.Resource))
// add the new metadata to the watcher received metadata
for id, metadata := range newMetadataEvents {
enricher.metadata[id] = metadata
}
}
}
}
// AddEventHandler sets add, update and delete methods of watcher.
// Those methods are triggered when an event is detected for a
// resource creation, update or deletion.
resourceMetaWatcher.watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
resourceWatchers.lock.Lock()
defer resourceWatchers.lock.Unlock()
// Add object(detected resource) to the list of metadata objects of this watcher,
// so it can be used by enrichers created after the event is triggered.
// The identifier of the object is in the form of namespace/name so that
// it can be easily fetched from watcher's store in previous step.
accessor, _ := meta.Accessor(obj.(kubernetes.Resource))
id := accessor.GetName()
namespace := accessor.GetNamespace()
if namespace != "" {
id = namespace + "/" + id
}
resourceMetaWatcher.metadataObjects[id] = true
// Execute the updateFunc of each enricher associated to thos watcher.
for _, enricher := range resourceMetaWatcher.enrichers {
enricher.Lock()
newMetadataEvents := enricher.updateFunc(obj.(kubernetes.Resource))
// add the new metadata to the watcher received metadata
for id, metadata := range newMetadataEvents {
enricher.metadata[id] = metadata
}
enricher.Unlock()
}
},
UpdateFunc: func(obj interface{}) {
resourceWatchers.lock.Lock()
defer resourceWatchers.lock.Unlock()
// Add object to the list of metadata objects of this watcher
accessor, _ := meta.Accessor(obj.(kubernetes.Resource))