Skip to content

Commit

Permalink
[receiver/awscontainerinsightreceiver] Add new deployment and daemons…
Browse files Browse the repository at this point in the history
…et metrics (open-telemetry#20)
  • Loading branch information
sky333999 authored May 31, 2023
1 parent b482642 commit 7a68c73
Show file tree
Hide file tree
Showing 12 changed files with 896 additions and 41 deletions.
54 changes: 36 additions & 18 deletions internal/aws/containerinsight/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ const (
StatusConditionNetworkUnavailable = "status_condition_network_unavailable"
StatusCapacityPods = "status_capacity_pods"
StatusAllocatablePods = "status_allocatable_pods"
StatusNumberAvailable = "status_number_available"
StatusNumberUnavailable = "status_number_unavailable"
StatusDesiredNumberScheduled = "status_desired_number_scheduled"
StatusCurrentNumberScheduled = "status_current_number_scheduled"
StatusReplicasAvailable = "status_replicas_available"
StatusReplicasUnavailable = "status_replicas_unavailable"
StatusReplicas = "status_replicas"
SpecReplicas = "spec_replicas"
StatusRunning = "status_running"
StatusTerminated = "status_terminated"
StatusWaiting = "status_waiting"
Expand All @@ -115,23 +123,25 @@ const (
DiskIOTotal = "Total"

// Define the metric types
TypeCluster = "Cluster"
TypeClusterService = "ClusterService"
TypeClusterNamespace = "ClusterNamespace"
TypeService = "Service"
TypeInstance = "Instance" // mean EC2 Instance in ECS
TypeNode = "Node" // mean EC2 Instance in EKS
TypeInstanceFS = "InstanceFS"
TypeNodeFS = "NodeFS"
TypeInstanceNet = "InstanceNet"
TypeNodeNet = "NodeNet"
TypeInstanceDiskIO = "InstanceDiskIO"
TypeNodeDiskIO = "NodeDiskIO"
TypePod = "Pod"
TypePodNet = "PodNet"
TypeContainer = "Container"
TypeContainerFS = "ContainerFS"
TypeContainerDiskIO = "ContainerDiskIO"
TypeCluster = "Cluster"
TypeClusterService = "ClusterService"
TypeClusterDeployment = "ClusterDeployment"
TypeClusterDaemonSet = "ClusterDaemonSet"
TypeClusterNamespace = "ClusterNamespace"
TypeService = "Service"
TypeInstance = "Instance" // mean EC2 Instance in ECS
TypeNode = "Node" // mean EC2 Instance in EKS
TypeInstanceFS = "InstanceFS"
TypeNodeFS = "NodeFS"
TypeInstanceNet = "InstanceNet"
TypeNodeNet = "NodeNet"
TypeInstanceDiskIO = "InstanceDiskIO"
TypeNodeDiskIO = "NodeDiskIO"
TypePod = "Pod"
TypePodNet = "PodNet"
TypeContainer = "Container"
TypeContainerFS = "ContainerFS"
TypeContainerDiskIO = "ContainerDiskIO"
// Special type for pause container
// because containerd does not set container name pause container name to POD like docker does.
TypeInfraContainer = "InfraContainer"
Expand Down Expand Up @@ -213,14 +223,22 @@ func init() {
FSInodesfree: UnitCount,
FSUtilization: UnitPercent,

// status metrics
// status & spec metrics
StatusConditionReady: UnitCount,
StatusConditionDiskPressure: UnitCount,
StatusConditionMemoryPressure: UnitCount,
StatusConditionPIDPressure: UnitCount,
StatusConditionNetworkUnavailable: UnitCount,
StatusCapacityPods: UnitCount,
StatusAllocatablePods: UnitCount,
StatusReplicas: UnitCount,
StatusReplicasAvailable: UnitCount,
StatusReplicasUnavailable: UnitCount,
StatusNumberAvailable: UnitCount,
StatusNumberUnavailable: UnitCount,
StatusDesiredNumberScheduled: UnitCount,
StatusCurrentNumberScheduled: UnitCount,
SpecReplicas: UnitCount,

// kube-state-metrics equivalents
StatusRunning: UnitCount,
Expand Down
6 changes: 6 additions & 0 deletions internal/aws/containerinsight/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ func getPrefixByMetricType(mType string) string {
service := "service_"
cluster := "cluster_"
namespace := "namespace_"
deployment := "deployment_"
daemonSet := "daemonset_"

switch mType {
case TypeInstance:
Expand Down Expand Up @@ -142,6 +144,10 @@ func getPrefixByMetricType(mType string) string {
prefix = service
case TypeClusterNamespace:
prefix = namespace
case TypeClusterDeployment:
prefix = deployment
case TypeClusterDaemonSet:
prefix = daemonSet
default:
log.Printf("E! Unexpected MetricType: %s", mType)
}
Expand Down
60 changes: 60 additions & 0 deletions internal/aws/k8s/k8sclient/clientset.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,16 @@ type replicaSetClientWithStopper interface {
stopper
}

type deploymentClientWithStopper interface {
DeploymentClient
stopper
}

type daemonSetClientWithStopper interface {
DaemonSetClient
stopper
}

type K8sClient struct {
kubeConfigPath string
initSyncPollInterval time.Duration
Expand Down Expand Up @@ -221,6 +231,12 @@ type K8sClient struct {
rsMu sync.Mutex
replicaSet replicaSetClientWithStopper

dMu sync.Mutex
deployment deploymentClientWithStopper

dsMu sync.Mutex
daemonSet daemonSetClientWithStopper

logger *zap.Logger
}

Expand Down Expand Up @@ -264,6 +280,8 @@ func (c *K8sClient) init(logger *zap.Logger, options ...Option) error {
c.node = nil
c.job = nil
c.replicaSet = nil
c.deployment = nil
c.daemonSet = nil

return nil
}
Expand Down Expand Up @@ -357,6 +375,46 @@ func (c *K8sClient) ShutdownReplicaSetClient() {
})
}

func (c *K8sClient) GetDeploymentClient() DeploymentClient {
var err error
c.dMu.Lock()
if c.deployment == nil || reflect.ValueOf(c.deployment).IsNil() {
c.deployment, err = newDeploymentClient(c.clientSet, c.logger, deploymentSyncCheckerOption(c.syncChecker))
if err != nil {
c.logger.Error("use an no-op deployment client instead because of error", zap.Error(err))
c.deployment = &noOpDeploymentClient{}
}
}
c.dMu.Unlock()
return c.deployment
}

func (c *K8sClient) ShutdownDeploymentClient() {
shutdownClient(c.deployment, &c.dMu, func() {
c.deployment = nil
})
}

func (c *K8sClient) GetDaemonSetClient() DaemonSetClient {
var err error
c.dsMu.Lock()
if c.daemonSet == nil || reflect.ValueOf(c.daemonSet).IsNil() {
c.daemonSet, err = newDaemonSetClient(c.clientSet, c.logger, daemonSetSyncCheckerOption(c.syncChecker))
if err != nil {
c.logger.Error("use an no-op daemonSet client instead because of error", zap.Error(err))
c.daemonSet = &noOpDaemonSetClient{}
}
}
c.dsMu.Unlock()
return c.daemonSet
}

func (c *K8sClient) ShutdownDaemonSetClient() {
shutdownClient(c.daemonSet, &c.dsMu, func() {
c.daemonSet = nil
})
}

func (c *K8sClient) GetClientSet() kubernetes.Interface {
return c.clientSet
}
Expand All @@ -371,6 +429,8 @@ func (c *K8sClient) Shutdown() {
c.ShutdownNodeClient()
c.ShutdownJobClient()
c.ShutdownReplicaSetClient()
c.ShutdownDeploymentClient()
c.ShutdownDaemonSetClient()

// remove the current instance of k8s client from map
for key, val := range optionsToK8sClient {
Expand Down
151 changes: 151 additions & 0 deletions internal/aws/k8s/k8sclient/daemonset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package k8sclient // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient"

import (
"context"
"fmt"
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"sync"
)

type DaemonSetClient interface {
// DaemonSetInfos contains the information about each daemon set in the cluster
DaemonSetInfos() []*DaemonSetInfo
}

type noOpDaemonSetClient struct {
}

func (nd *noOpDaemonSetClient) DaemonSetInfos() []*DaemonSetInfo {
return []*DaemonSetInfo{}
}

func (nd *noOpDaemonSetClient) shutdown() {
}

type daemonSetClientOption func(*daemonSetClient)

func daemonSetSyncCheckerOption(checker initialSyncChecker) daemonSetClientOption {
return func(d *daemonSetClient) {
d.syncChecker = checker
}
}

type daemonSetClient struct {
stopChan chan struct{}
stopped bool

store *ObjStore

syncChecker initialSyncChecker

mu sync.RWMutex
daemonSetInfos []*DaemonSetInfo
}

func (d *daemonSetClient) refresh() {
d.mu.Lock()
defer d.mu.Unlock()

var daemonSetInfos []*DaemonSetInfo
objsList := d.store.List()
for _, obj := range objsList {
daemonSet, ok := obj.(*DaemonSetInfo)
if !ok {
continue
}
daemonSetInfos = append(daemonSetInfos, daemonSet)
}

d.daemonSetInfos = daemonSetInfos
}

func (d *daemonSetClient) DaemonSetInfos() []*DaemonSetInfo {
if d.store.GetResetRefreshStatus() {
d.refresh()
}
d.mu.RLock()
defer d.mu.RUnlock()
return d.daemonSetInfos
}

func newDaemonSetClient(clientSet kubernetes.Interface, logger *zap.Logger, options ...daemonSetClientOption) (*daemonSetClient, error) {
d := &daemonSetClient{
stopChan: make(chan struct{}),
}

for _, option := range options {
option(d)
}

ctx := context.Background()
if _, err := clientSet.AppsV1().DaemonSets(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}); err != nil {
return nil, fmt.Errorf("cannot list DaemonSets. err: %w", err)
}

d.store = NewObjStore(transformFuncDaemonSet, logger)
lw := createDaemonSetListWatch(clientSet, metav1.NamespaceAll)
reflector := cache.NewReflector(lw, &appsv1.DaemonSet{}, d.store, 0)

go reflector.Run(d.stopChan)

if d.syncChecker != nil {
// check the init sync for potential connection issue
d.syncChecker.Check(reflector, "DaemonSet initial sync timeout")
}

return d, nil
}

func (d *daemonSetClient) shutdown() {
close(d.stopChan)
d.stopped = true
}

func transformFuncDaemonSet(obj interface{}) (interface{}, error) {
daemonSet, ok := obj.(*appsv1.DaemonSet)
if !ok {
return nil, fmt.Errorf("input obj %v is not DaemonSet type", obj)
}
info := new(DaemonSetInfo)
info.Name = daemonSet.Name
info.Namespace = daemonSet.Namespace
info.Status = &DaemonSetStatus{
NumberAvailable: uint32(daemonSet.Status.NumberAvailable),
NumberUnavailable: uint32(daemonSet.Status.NumberUnavailable),
DesiredNumberScheduled: uint32(daemonSet.Status.DesiredNumberScheduled),
CurrentNumberScheduled: uint32(daemonSet.Status.CurrentNumberScheduled),
}
return info, nil
}

func createDaemonSetListWatch(client kubernetes.Interface, ns string) cache.ListerWatcher {
ctx := context.Background()
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
return client.AppsV1().DaemonSets(ns).List(ctx, opts)
},
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
return client.AppsV1().DaemonSets(ns).Watch(ctx, opts)
},
}
}
28 changes: 28 additions & 0 deletions internal/aws/k8s/k8sclient/daemonset_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package k8sclient // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient"

type DaemonSetInfo struct {
Name string
Namespace string
Status *DaemonSetStatus
}

type DaemonSetStatus struct {
NumberAvailable uint32
NumberUnavailable uint32
DesiredNumberScheduled uint32
CurrentNumberScheduled uint32
}
Loading

0 comments on commit 7a68c73

Please sign in to comment.