Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new OTel pipeline for scraping Kueue metrics and publishing to CloudWatch #1426

Merged
merged 34 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
71c57f6
add processor and translator for kueue metrics (#844)
rohangujarathi Oct 25, 2024
39ad36f
Add Kueue Metrics Translator Definitions (#847)
transtv Oct 25, 2024
5242cb4
add NodeName to kueueLabelFilter and fix unit tests (#862)
rohangujarathi Oct 25, 2024
4c9b6b6
drop ResourceMetricAttributes for kueue (#866)
rohangujarathi Oct 30, 2024
97581d4
change: separate kueue metrics into its own container insights pipeline
rvasahu-amazon Oct 30, 2024
41f1403
change: add separate emf exporter config for kueue metrics
rvasahu-amazon Oct 30, 2024
1a79607
refactor: remove unnecessary kueue config field from emf exporter
rvasahu-amazon Nov 6, 2024
edcf26b
change: add awscontainerinsightskueuereceiver as a new receiver type …
rvasahu-amazon Nov 6, 2024
71d8f8f
change: adjust kueue pipeline translator to use awscontainerinsightsk…
rvasahu-amazon Nov 6, 2024
beb8e4f
refactor: only create kueue pipeline translator if kueue_container_in…
rvasahu-amazon Nov 6, 2024
ea4c0f0
change: remove kueue flag from container insights receiver translator
rvasahu-amazon Nov 6, 2024
024f0b3
change: add default emf config for kueue
rvasahu-amazon Nov 6, 2024
bf8e923
test: update tocw sample configs to reflect a separate receiver for k…
rvasahu-amazon Nov 6, 2024
10492d6
change: add placeholder version for new receiver
rvasahu-amazon Nov 6, 2024
84f5d9f
doc: add licence to containerinsights/translators.go
rvasahu-amazon Nov 6, 2024
b5b0511
fix: add another placeholder for new awscontainerinsightskueuereceive…
rvasahu-amazon Nov 6, 2024
47db1d8
refactor: simplify emf exporter helper functions
rvasahu-amazon Nov 7, 2024
fef26a5
refactor: no need to always return nil error from containerinsights/t…
rvasahu-amazon Nov 7, 2024
481e9e9
refactor: move KueueContainerInsightsEnabled to containerinsights pac…
rvasahu-amazon Nov 7, 2024
7e54555
doc: remove commented code
rvasahu-amazon Nov 7, 2024
f387805
change: remove redundant helper function
rvasahu-amazon Nov 7, 2024
204f0a2
fix: move KueueContainerInsightsEnabled to common package
rvasahu-amazon Nov 7, 2024
db141b6
refactor: remove unneeded KueueContainerInsightsCall
rvasahu-amazon Nov 7, 2024
8515308
doc: lint fix to awsemf/kueue.go
rvasahu-amazon Nov 7, 2024
35ce89c
Merge branch 'main-kueue' into kueue-for-cw
rvasahu-amazon Nov 12, 2024
dfd17ce
build: fill placeholder for kueue receiver in go.mod
rvasahu-amazon Nov 12, 2024
2e1e1f3
fix: merge conflict for AWS EMF exporter translator
rvasahu-amazon Nov 12, 2024
3b80984
Merge remote-tracking branch 'public/main' into kueue-for-cw
rvasahu-amazon Nov 14, 2024
0d24e7d
doc: move kueue receiver dependency to require block with other recei…
rvasahu-amazon Nov 14, 2024
6ab1cea
build: update versions for otel dependencies
rvasahu-amazon Nov 14, 2024
f6fad06
build: re-enable github PR build actions
rvasahu-amazon Nov 14, 2024
ffaebc5
fix: add pipeline name back as a parameter
rvasahu-amazon Nov 14, 2024
8cf5193
fix: add new entitystore extension to kueue translated yamls
rvasahu-amazon Nov 14, 2024
fa60db6
build: update version for awsemfexporter
rvasahu-amazon Nov 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 0 additions & 69 deletions .github/workflows/PR-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,72 +68,3 @@ jobs:
- name: Check license and imports
if: needs.changes.outputs.lint == 'true'
run: make simple-lint

build:
needs: [lint, changes]
name: Build ${{ matrix.os }}
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ ubuntu-latest, windows-2019, windows-latest, macos-12]
include:
- os: ubuntu-latest
family: linux
cache-path: |
~/.cache/go-build
~/go/pkg/mod
- os: macos-12
family: darwin
cache-path: |
~/Library/Caches/go-build
~/go/pkg/mod
- os: windows-2019
family: windows
cache-path: |
~\AppData\Local\go-build
~\go\pkg\mod
- os: windows-latest
family: windows
cache-path: |
~\AppData\Local\go-build
~\go\pkg\mod
steps:
- name: Set up Go 1.x
if: needs.changes.outputs.build == 'true'
uses: actions/setup-go@v4
with:
go-version: ~1.22.2
cache: false

- name: Check out code
if: needs.changes.outputs.build == 'true'
uses: actions/checkout@v3

- name: Cache binaries
id: cached_binaries
if: needs.changes.outputs.build == 'true'
uses: actions/cache@v3
with:
key: "cached-binaries-${{ matrix.os }}-${{ github.sha }}"
path: go.mod

- name: Cache build output
if: steps.cached_binaries.outputs.cache-hit != 'true' && needs.changes.outputs.build == 'true'
uses: actions/cache@v3
with:
path: ${{ matrix.cache-path }}
key: v1-go-pkg-mod-${{ matrix.os }}-${{ hashFiles('**/go.sum') }}

- name: Install make
if: matrix.family == 'windows' && steps.cached_binaries.outputs.cache-hit != 'true' && needs.changes.outputs.build == 'true'
run: choco install make

- name: Unit Test
if: steps.cached_binaries.outputs.cache-hit != 'true' && needs.changes.outputs.build == 'true'
run: make test

- name: Build
if: steps.cached_binaries.outputs.cache-hit != 'true' && needs.changes.outputs.build == 'true'
run: make amazon-cloudwatch-agent-${{ matrix.family }}

3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/reso

replace (
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver => github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver v0.0.0-20241104203805-20919412150d
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightskueuereceiver => github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awscontainerinsightskueuereceiver v0.0.0-20241112181539-bc8278558352
sky333999 marked this conversation as resolved.
Show resolved Hide resolved
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsxrayreceiver => github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awsxrayreceiver v0.0.0-20241104203805-20919412150d
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jmxreceiver => github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/jmxreceiver v0.0.0-20241104203805-20919412150d
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver => github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.0.0-20241104203805-20919412150d
Expand Down Expand Up @@ -208,6 +209,8 @@ require (
k8s.io/klog/v2 v2.120.1
)

require github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightskueuereceiver v0.0.0-20241112181539-bc8278558352
rvasahu-amazon marked this conversation as resolved.
Show resolved Hide resolved

require (
cloud.google.com/go v0.112.1 // indirect
cloud.google.com/go/compute/metadata v0.3.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ github.com/amazon-contributing/opentelemetry-collector-contrib/processor/resourc
github.com/amazon-contributing/opentelemetry-collector-contrib/processor/resourcedetectionprocessor v0.0.0-20241104203805-20919412150d/go.mod h1:uzpU7Y6+oL6RdOv8IWi6fjT8LNV6FYX6CN6NATLJOiQ=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver v0.0.0-20241104203805-20919412150d h1:LvHBMj+2Kh+SAESWTVoLbJ8bD4Xq3toB4SU66hYjg0M=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver v0.0.0-20241104203805-20919412150d/go.mod h1:StgsMi0cNUydO2N/7WbLYPUBXzvp9wIMcWp9P8x/Vck=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awscontainerinsightskueuereceiver v0.0.0-20241112181539-bc8278558352 h1:WqkB8yOj8YCSxWzVUVCeVx263UWnmBUpMquetzwmntQ=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awscontainerinsightskueuereceiver v0.0.0-20241112181539-bc8278558352/go.mod h1://9Xy+KG8K9KvujBh6sZXIYPDvbu8xsiU1l8StFHjMA=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awsxrayreceiver v0.0.0-20241104203805-20919412150d h1:eQsOVRzXk32sVDkwJALfN0A4kZEMC2NNG8jBjZAZ7UM=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/awsxrayreceiver v0.0.0-20241104203805-20919412150d/go.mod h1:igQaQJt7eA/y3dZ2VLXVql+6k/ZXBgrAa2y9FrMMIKQ=
github.com/amazon-contributing/opentelemetry-collector-contrib/receiver/jmxreceiver v0.0.0-20241104203805-20919412150d h1:Cld9lc7zzU/EV70Lv8EXylsx4ATjz9jHfsObEYfjyzQ=
Expand Down
13 changes: 13 additions & 0 deletions internal/containerinsightscommon/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ const (
SourcesKey = "Sources"
GpuDeviceKey = "GpuDevice"

ClusterQueueNameKey = "ClusterQueue"
ClusterQueueStatusKey = "Status"
ClusterQueueReasonKey = "Reason"
ClusterQueueResourceKey = "Resource"
Flavor = "Flavor"

// metric collected
CpuTotal = "cpu_usage_total"
CpuUser = "cpu_usage_user"
Expand Down Expand Up @@ -102,10 +108,17 @@ const (
NeuronHardware = "neuron_hardware"
NeuronExecutionLatency = "neuron_execution_latency"

KueuePendingWorkloads = "kueue_pending_workloads"
KueueEvictedWorkloadsTotal = "kueue_evicted_workloads_total"
KueueAdmittedActiveWorkloads = "kueue_admitted_active_workloads"
KueueClusterQueueResourceUsage = "kueue_cluster_queue_resource_usage"
KueueClusterQueueNominalUsage = "kueue_cluster_queue_nominal_quota"

TypeCluster = "Cluster"
TypeClusterService = "ClusterService"
TypeClusterNamespace = "ClusterNamespace"
TypeService = "Service"
TypeClusterQueue = "ClusterQueue"

// Both TypeInstance and TypeNode mean EC2 Instance, they are used in ECS and EKS separately
TypeInstance = "Instance"
Expand Down
2 changes: 1 addition & 1 deletion internal/containerinsightscommon/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func MetricName(mType string, name string) string {
prefix = containerPrefix
case TypeService:
prefix = service
case TypeCluster, TypeGpuCluster:
case TypeCluster, TypeGpuCluster, TypeClusterQueue:
prefix = cluster
case K8sNamespace:
prefix = namespace
Expand Down
19 changes: 19 additions & 0 deletions plugins/processors/kueueattributes/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package kueueattributes

import (
"go.opentelemetry.io/collector/component"
)

type Config struct{}

// Verify Config implements Processor interface.
var _ component.Config = (*Config)(nil)

// Validate does not check for unsupported dimension key-value pairs, because those
// get silently dropped and ignored during translation.
func (cfg *Config) Validate() error {
return nil
}
18 changes: 18 additions & 0 deletions plugins/processors/kueueattributes/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package kueueattributes

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/confmap"
)

func TestUnmarshalDefaultConfig(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.NoError(t, confmap.New().Unmarshal(cfg))
assert.Equal(t, factory.CreateDefaultConfig(), cfg)
}
56 changes: 56 additions & 0 deletions plugins/processors/kueueattributes/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package kueueattributes

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"
)

const (
stability = component.StabilityLevelBeta
)

var (
TypeStr, _ = component.NewType("kueueattributes")
processorCapabilities = consumer.Capabilities{MutatesData: true}
)

func NewFactory() processor.Factory {
return processor.NewFactory(
TypeStr,
createDefaultConfig,
processor.WithMetrics(createMetricsProcessor, stability))
}

func createDefaultConfig() component.Config {
return &Config{}
}

func createMetricsProcessor(
ctx context.Context,
set processor.CreateSettings,
cfg component.Config,
nextConsumer consumer.Metrics,
) (processor.Metrics, error) {
processorConfig, ok := cfg.(*Config)
if !ok {
return nil, fmt.Errorf("configuration parsing error")
}

metricsProcessor := newKueueAttributesProcessor(processorConfig, set.Logger)

return processorhelper.NewMetricsProcessor(
ctx,
set,
cfg,
nextConsumer,
metricsProcessor.processMetrics,
processorhelper.WithCapabilities(processorCapabilities))
}
45 changes: 45 additions & 0 deletions plugins/processors/kueueattributes/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package kueueattributes

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/processor/processortest"
)

func TestCreateDefaultConfig(t *testing.T) {
factory := NewFactory()
require.NotNil(t, factory)

cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
}

func TestCreateProcessor(t *testing.T) {
factory := NewFactory()
require.NotNil(t, factory)

cfg := factory.CreateDefaultConfig()
setting := processortest.NewNopCreateSettings()

tProcessor, err := factory.CreateTracesProcessor(context.Background(), setting, cfg, consumertest.NewNop())
assert.Equal(t, err, component.ErrDataTypeIsNotSupported)
assert.Nil(t, tProcessor)

mProcessor, err := factory.CreateMetricsProcessor(context.Background(), setting, cfg, consumertest.NewNop())
assert.NoError(t, err)
assert.NotNil(t, mProcessor)

lProcessor, err := factory.CreateLogsProcessor(context.Background(), setting, cfg, consumertest.NewNop())
assert.Equal(t, err, component.ErrDataTypeIsNotSupported)
assert.Nil(t, lProcessor)
}
106 changes: 106 additions & 0 deletions plugins/processors/kueueattributes/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package kueueattributes

import (
"context"
"strings"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"

"github.com/aws/amazon-cloudwatch-agent/internal/containerinsightscommon"
)

const (
kueueMetricsIdentifier = "kueue"
)

var kueueLabelFilter = map[string]interface{}{
containerinsightscommon.ClusterNameKey: nil,
containerinsightscommon.ClusterQueueNameKey: nil,
containerinsightscommon.ClusterQueueStatusKey: nil,
containerinsightscommon.ClusterQueueReasonKey: nil,
containerinsightscommon.ClusterQueueResourceKey: nil,
containerinsightscommon.Flavor: nil,
containerinsightscommon.NodeNameKey: nil,
}

type kueueAttributesProcessor struct {
*Config
logger *zap.Logger
labelFilter map[string]interface{}
}

func newKueueAttributesProcessor(config *Config, logger *zap.Logger) *kueueAttributesProcessor {
d := &kueueAttributesProcessor{
Config: config,
logger: logger,
labelFilter: kueueLabelFilter,
}
return d
}

func (d *kueueAttributesProcessor) processMetrics(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
rms := md.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
sms := rm.ScopeMetrics()
for j := 0; j < sms.Len(); j++ {
metrics := sms.At(j).Metrics()
for k := 0; k < metrics.Len(); k++ {
m := metrics.At(k)
d.processMetricAttributes(m)
}
}
d.dropResourceMetricAttributes(rm)
}
return md, nil
}

func (d *kueueAttributesProcessor) processMetricAttributes(m pmetric.Metric) {
// only decorate kueue metrics
if !strings.HasPrefix(m.Name(), kueueMetricsIdentifier) {
return
}

var dps pmetric.NumberDataPointSlice
switch m.Type() {
case pmetric.MetricTypeGauge:
dps = m.Gauge().DataPoints()
case pmetric.MetricTypeSum:
dps = m.Sum().DataPoints()
default:
d.logger.Debug("Ignore unknown metric type", zap.String(containerinsightscommon.MetricType, m.Type().String()))
}

for i := 0; i < dps.Len(); i++ {
d.filterAttributes(dps.At(i).Attributes())
}
}

func (d *kueueAttributesProcessor) filterAttributes(attributes pcommon.Map) {
labels := d.labelFilter
if len(labels) == 0 {
return
}
// remove labels that are not in the keep list
attributes.RemoveIf(func(k string, _ pcommon.Value) bool {
if _, ok := labels[k]; ok {
return false
}
return true
})
}

func (d *kueueAttributesProcessor) dropResourceMetricAttributes(resourceMetric pmetric.ResourceMetrics) {
serviceNameKey := "service.name"
attributes := resourceMetric.Resource().Attributes()
serviceName, exists := attributes.Get(serviceNameKey)

if exists && (serviceName.Str() == "containerInsightsKueueMetricsScraper") {
resourceMetric.Resource().Attributes().Clear()
}
}
Loading
Loading