Skip to content

Commit

Permalink
Merge pull request open-telemetry#65 from movence/hsookim-state-metrics
Browse files Browse the repository at this point in the history
Add status_replicas_available/_unavailable & replicas_desired/_ready metrics for all workload types
  • Loading branch information
chadpatel authored Aug 9, 2023
2 parents b8f0f42 + fa39b7d commit c9edfc3
Show file tree
Hide file tree
Showing 17 changed files with 770 additions and 88 deletions.
46 changes: 25 additions & 21 deletions internal/aws/containerinsight/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ const (
StatusCurrentNumberScheduled = "status_current_number_scheduled"
StatusReplicasAvailable = "status_replicas_available"
StatusReplicasUnavailable = "status_replicas_unavailable"
StatusReplicas = "status_replicas"
SpecReplicas = "spec_replicas"
StatusRunning = "status_running"
StatusTerminated = "status_terminated"
Expand All @@ -115,6 +114,8 @@ const (
StatusUnknown = "status_unknown"
StatusReady = "status_ready"
StatusScheduled = "status_scheduled"
ReplicasDesired = "replicas_desired"
ReplicasReady = "replicas_ready"

RunningPodCount = "number_of_running_pods"
RunningContainerCount = "number_of_running_containers"
Expand All @@ -133,25 +134,27 @@ const (
DiskIOTotal = "Total"

// Define the metric types
TypeCluster = "Cluster"
TypeClusterService = "ClusterService"
TypeClusterDeployment = "ClusterDeployment"
TypeClusterDaemonSet = "ClusterDaemonSet"
TypeClusterNamespace = "ClusterNamespace"
TypeService = "Service"
TypeInstance = "Instance" // mean EC2 Instance in ECS
TypeNode = "Node" // mean EC2 Instance in EKS
TypeInstanceFS = "InstanceFS"
TypeNodeFS = "NodeFS"
TypeInstanceNet = "InstanceNet"
TypeNodeNet = "NodeNet"
TypeInstanceDiskIO = "InstanceDiskIO"
TypeNodeDiskIO = "NodeDiskIO"
TypePod = "Pod"
TypePodNet = "PodNet"
TypeContainer = "Container"
TypeContainerFS = "ContainerFS"
TypeContainerDiskIO = "ContainerDiskIO"
TypeCluster = "Cluster"
TypeClusterService = "ClusterService"
TypeClusterDeployment = "ClusterDeployment"
TypeClusterDaemonSet = "ClusterDaemonSet"
TypeClusterStatefulSet = "ClusterStatefulSet"
TypeClusterReplicaSet = "ClusterReplicaSet"
TypeClusterNamespace = "ClusterNamespace"
TypeService = "Service"
TypeInstance = "Instance" // mean EC2 Instance in ECS
TypeNode = "Node" // mean EC2 Instance in EKS
TypeInstanceFS = "InstanceFS"
TypeNodeFS = "NodeFS"
TypeInstanceNet = "InstanceNet"
TypeNodeNet = "NodeNet"
TypeInstanceDiskIO = "InstanceDiskIO"
TypeNodeDiskIO = "NodeDiskIO"
TypePod = "Pod"
TypePodNet = "PodNet"
TypeContainer = "Container"
TypeContainerFS = "ContainerFS"
TypeContainerDiskIO = "ContainerDiskIO"
// Special type for pause container
// because containerd does not set container name pause container name to POD like docker does.
TypeInfraContainer = "InfraContainer"
Expand Down Expand Up @@ -245,14 +248,15 @@ func init() {
StatusConditionUnknown: UnitCount,
StatusCapacityPods: UnitCount,
StatusAllocatablePods: UnitCount,
StatusReplicas: UnitCount,
StatusReplicasAvailable: UnitCount,
StatusReplicasUnavailable: UnitCount,
StatusNumberAvailable: UnitCount,
StatusNumberUnavailable: UnitCount,
StatusDesiredNumberScheduled: UnitCount,
StatusCurrentNumberScheduled: UnitCount,
SpecReplicas: UnitCount,
ReplicasDesired: UnitCount,
ReplicasReady: UnitCount,

// kube-state-metrics equivalents
StatusRunning: UnitCount,
Expand Down
6 changes: 6 additions & 0 deletions internal/aws/containerinsight/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ func getPrefixByMetricType(mType string) string {
namespace := "namespace_"
deployment := "deployment_"
daemonSet := "daemonset_"
statefulSet := "statefulset_"
replicaSet := "replicaset_"

switch mType {
case TypeInstance:
Expand Down Expand Up @@ -148,6 +150,10 @@ func getPrefixByMetricType(mType string) string {
prefix = deployment
case TypeClusterDaemonSet:
prefix = daemonSet
case TypeClusterStatefulSet:
prefix = statefulSet
case TypeClusterReplicaSet:
prefix = replicaSet
default:
log.Printf("E! Unexpected MetricType: %s", mType)
}
Expand Down
30 changes: 30 additions & 0 deletions internal/aws/k8s/k8sclient/clientset.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ type daemonSetClientWithStopper interface {
stopper
}

type statefulSetClientWithStopper interface {
StatefulSetClient
stopper
}

type K8sClient struct {
kubeConfigPath string
initSyncPollInterval time.Duration
Expand Down Expand Up @@ -237,6 +242,9 @@ type K8sClient struct {
dsMu sync.Mutex
daemonSet daemonSetClientWithStopper

ssMu sync.Mutex
statefulSet statefulSetClientWithStopper

logger *zap.Logger
}

Expand Down Expand Up @@ -282,6 +290,7 @@ func (c *K8sClient) init(logger *zap.Logger, options ...Option) error {
c.replicaSet = nil
c.deployment = nil
c.daemonSet = nil
c.statefulSet = nil

return nil
}
Expand Down Expand Up @@ -415,6 +424,26 @@ func (c *K8sClient) ShutdownDaemonSetClient() {
})
}

func (c *K8sClient) GetStatefulSetClient() StatefulSetClient {
var err error
c.ssMu.Lock()
defer c.ssMu.Unlock()
if c.statefulSet == nil || reflect.ValueOf(c.statefulSet).IsNil() {
c.statefulSet, err = newStatefulSetClient(c.clientSet, c.logger, statefulSetSyncCheckerOption(c.syncChecker))
if err != nil {
c.logger.Error("use an no-op statefulSet client instead because of error", zap.Error(err))
c.statefulSet = &noOpStatefulSetClient{}
}
}
return c.statefulSet
}

func (c *K8sClient) ShutdownStatefulSetClient() {
shutdownClient(c.statefulSet, &c.ssMu, func() {
c.statefulSet = nil
})
}

func (c *K8sClient) GetClientSet() kubernetes.Interface {
return c.clientSet
}
Expand All @@ -431,6 +460,7 @@ func (c *K8sClient) Shutdown() {
c.ShutdownReplicaSetClient()
c.ShutdownDeploymentClient()
c.ShutdownDaemonSetClient()
c.ShutdownStatefulSetClient()

// remove the current instance of k8s client from map
for key, val := range optionsToK8sClient {
Expand Down
1 change: 1 addition & 0 deletions internal/aws/k8s/k8sclient/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func transformFuncDeployment(obj interface{}) (interface{}, error) {
}
info.Status = &DeploymentStatus{
Replicas: uint32(deployment.Status.Replicas),
ReadyReplicas: uint32(deployment.Status.ReadyReplicas),
AvailableReplicas: uint32(deployment.Status.AvailableReplicas),
UnavailableReplicas: uint32(deployment.Status.UnavailableReplicas),
}
Expand Down
1 change: 1 addition & 0 deletions internal/aws/k8s/k8sclient/deployment_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type DeploymentSpec struct {

type DeploymentStatus struct {
Replicas uint32
ReadyReplicas uint32
AvailableReplicas uint32
UnavailableReplicas uint32
}
12 changes: 8 additions & 4 deletions internal/aws/k8s/k8sclient/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ var deploymentObjects = []runtime.Object{
Replicas: &desired,
},
Status: appsv1.DeploymentStatus{
Replicas: 5,
Replicas: 10,
ReadyReplicas: 5,
AvailableReplicas: 5,
UnavailableReplicas: 1,
},
Expand All @@ -55,7 +56,8 @@ var deploymentObjects = []runtime.Object{
},
Status: appsv1.DeploymentStatus{
Replicas: 15,
AvailableReplicas: 15,
ReadyReplicas: 10,
AvailableReplicas: 10,
UnavailableReplicas: 2,
},
},
Expand All @@ -81,7 +83,8 @@ func TestDeploymentClient(t *testing.T) {
Replicas: 20,
},
Status: &DeploymentStatus{
Replicas: 5,
Replicas: 10,
ReadyReplicas: 5,
AvailableReplicas: 5,
UnavailableReplicas: 1,
},
Expand All @@ -94,7 +97,8 @@ func TestDeploymentClient(t *testing.T) {
},
Status: &DeploymentStatus{
Replicas: 15,
AvailableReplicas: 15,
ReadyReplicas: 10,
AvailableReplicas: 10,
UnavailableReplicas: 2,
},
},
Expand Down
54 changes: 42 additions & 12 deletions internal/aws/k8s/k8sclient/replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
type ReplicaSetClient interface {
// Get the mapping between replica set and deployment
ReplicaSetToDeployment() map[string]string
ReplicaSetInfos() []*ReplicaSetInfo
}

type noOpReplicaSetClient struct {
Expand All @@ -45,6 +46,10 @@ func (nc *noOpReplicaSetClient) ReplicaSetToDeployment() map[string]string {
return map[string]string{}
}

func (nc *noOpReplicaSetClient) ReplicaSetInfos() []*ReplicaSetInfo {
return []*ReplicaSetInfo{}
}

func (nc *noOpReplicaSetClient) shutdown() {
}

Expand All @@ -66,6 +71,7 @@ type replicaSetClient struct {
mu sync.RWMutex
cachedReplicaSetMap map[string]time.Time
replicaSetToDeploymentMap map[string]string
replicaSetInfos []*ReplicaSetInfo
}

func (c *replicaSetClient) ReplicaSetToDeployment() map[string]string {
Expand All @@ -83,15 +89,21 @@ func (c *replicaSetClient) refresh() {

objsList := c.store.List()

var replicaSetInfos []*ReplicaSetInfo
tmpMap := make(map[string]string)
for _, obj := range objsList {
replicaSet := obj.(*replicaSetInfo)
ownerLoop:
for _, owner := range replicaSet.owners {
if owner.kind == deployment && owner.name != "" {
tmpMap[replicaSet.name] = owner.name
break ownerLoop
replicaSet := obj.(*ReplicaSetInfo)
if len(replicaSet.Owners) > 0 {
ownerLoop:
for _, owner := range replicaSet.Owners {
if owner.kind == deployment && owner.name != "" {
tmpMap[replicaSet.Name] = owner.name
break ownerLoop
}
}
} else {
// replicaSet without owner reference is not part of a deployment
replicaSetInfos = append(replicaSetInfos, replicaSet)
}
}

Expand All @@ -108,6 +120,17 @@ func (c *replicaSetClient) refresh() {
c.replicaSetToDeploymentMap[k] = v
c.cachedReplicaSetMap[k] = lastRefreshTime
}

c.replicaSetInfos = replicaSetInfos
}

func (c *replicaSetClient) ReplicaSetInfos() []*ReplicaSetInfo {
if c.store.GetResetRefreshStatus() {
c.refresh()
}
c.mu.RLock()
defer c.mu.RUnlock()
return c.replicaSetInfos
}

func newReplicaSetClient(clientSet kubernetes.Interface, logger *zap.Logger, options ...replicaSetClientOption) (*replicaSetClient, error) {
Expand All @@ -126,8 +149,6 @@ func newReplicaSetClient(clientSet kubernetes.Interface, logger *zap.Logger, opt
return nil, fmt.Errorf("cannot list ReplicaSet. err: %w", err)
}

c.stopChan = make(chan struct{})

c.store = NewObjStore(transformFuncReplicaSet, logger)

lw := createReplicaSetListWatch(clientSet, metav1.NamespaceAll)
Expand Down Expand Up @@ -155,11 +176,20 @@ func transformFuncReplicaSet(obj interface{}) (interface{}, error) {
if !ok {
return nil, fmt.Errorf("input obj %v is not ReplicaSet type", obj)
}
info := new(replicaSetInfo)
info.name = replicaSet.Name
info.owners = []*replicaSetOwner{}
info := new(ReplicaSetInfo)
info.Name = replicaSet.Name
info.Namespace = replicaSet.Namespace
info.Owners = []*ReplicaSetOwner{}
for _, owner := range replicaSet.OwnerReferences {
info.owners = append(info.owners, &replicaSetOwner{kind: owner.Kind, name: owner.Name})
info.Owners = append(info.Owners, &ReplicaSetOwner{kind: owner.Kind, name: owner.Name})
}
info.Spec = &ReplicaSetSpec{
Replicas: uint32(*replicaSet.Spec.Replicas),
}
info.Status = &ReplicaSetStatus{
Replicas: uint32(replicaSet.Status.Replicas),
AvailableReplicas: uint32(replicaSet.Status.AvailableReplicas),
ReadyReplicas: uint32(replicaSet.Status.ReadyReplicas),
}
return info, nil
}
Expand Down
21 changes: 17 additions & 4 deletions internal/aws/k8s/k8sclient/replicaset_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,25 @@

package k8sclient // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient"

type replicaSetInfo struct {
name string
owners []*replicaSetOwner
type ReplicaSetInfo struct {
Name string
Namespace string
Owners []*ReplicaSetOwner
Spec *ReplicaSetSpec
Status *ReplicaSetStatus
}

type replicaSetOwner struct {
type ReplicaSetOwner struct {
kind string
name string
}

type ReplicaSetSpec struct {
Replicas uint32
}

type ReplicaSetStatus struct {
Replicas uint32
AvailableReplicas uint32
ReadyReplicas uint32
}
Loading

0 comments on commit c9edfc3

Please sign in to comment.