Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
504 changes: 213 additions & 291 deletions remappers/hostmetrics/hostmetrics_test.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion remappers/internal/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package hostmetrics
package internal

import (
"github.com/elastic/opentelemetry-lib/remappers/common"
Expand Down
104 changes: 104 additions & 0 deletions remappers/internal/testing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package internal

import (
"testing"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

type TestMetric struct {
DP TestDP
Name string
Type pmetric.MetricType
}

type TestDP struct {
Dbl *float64
Int *int64
Attrs map[string]any
Ts pcommon.Timestamp
}

func MetricSliceToTestMetric(t *testing.T, ms pmetric.MetricSlice) []TestMetric {
testMetrics := make([]TestMetric, ms.Len())
for i := 0; i < ms.Len(); i++ {
m := ms.At(i)
testMetrics[i].Name = m.Name()
testMetrics[i].Type = m.Type()

var dps pmetric.NumberDataPointSlice
switch m.Type() {
case pmetric.MetricTypeGauge:
dps = m.Gauge().DataPoints()
case pmetric.MetricTypeSum:
dps = m.Sum().DataPoints()
}

if dps.Len() != 1 {
t.Fatalf("unexpected metric, test is written assuming each metric with a single datapoint")
}

dp := dps.At(0)
testMetrics[i].DP = TestDP{Ts: dp.Timestamp(), Attrs: dp.Attributes().AsRaw()}
switch dp.ValueType() {
case pmetric.NumberDataPointValueTypeInt:
testMetrics[i].DP.Int = Ptr(dp.IntValue())
case pmetric.NumberDataPointValueTypeDouble:
testMetrics[i].DP.Dbl = Ptr(dp.DoubleValue())
}
}

return testMetrics
}

func TestMetricToMetricSlice(t testing.TB, testMetrics []TestMetric, out pmetric.MetricSlice) {
out.EnsureCapacity(len(testMetrics))

for _, testm := range testMetrics {
m := out.AppendEmpty()
m.SetName(testm.Name)

var dps pmetric.NumberDataPointSlice
switch typ := testm.Type; typ {
case pmetric.MetricTypeGauge:
dps = m.SetEmptyGauge().DataPoints()
case pmetric.MetricTypeSum:
dps = m.SetEmptySum().DataPoints()
default:
t.Fatalf("unhandled metric type %s", typ)
}

dp := dps.AppendEmpty()
dp.SetTimestamp(testm.DP.Ts)
if testm.DP.Int != nil {
dp.SetIntValue(*testm.DP.Int)
} else if testm.DP.Dbl != nil {
dp.SetDoubleValue(*testm.DP.Dbl)
}
if err := dp.Attributes().FromRaw(testm.DP.Attrs); err != nil {
t.Fatalf("failed to copy attributes from test data: %v", err)
}
}
}

func Ptr[T any](v T) *T {
return &v
}
69 changes: 69 additions & 0 deletions remappers/kubernetesmetrics/clustermetrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package kubernetesmetrics

import (
remappers "github.com/elastic/opentelemetry-lib/remappers/internal"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

func addClusterMetrics(
src, out pmetric.MetricSlice,
_ pcommon.Resource,
dataset string,
) error {
var timestamp pcommon.Timestamp
var node_allocatable_memory, node_allocatable_cpu int64

// iterate all metrics in the current scope and generate the additional Elastic kubernetes integration metrics
for i := 0; i < src.Len(); i++ {
metric := src.At(i)
if metric.Name() == "k8s.node.allocatable_cpu" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about making these constants on top level? This would help us easily identify metric names, remappings etc on top level.

Copy link
Contributor Author

@gizas gizas Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See answer below #20 (comment)

dp := metric.Gauge().DataPoints().At(0)
if timestamp == 0 {
timestamp = dp.Timestamp()
}
node_allocatable_cpu = dp.IntValue()
} else if metric.Name() == "k8s.node.allocatable_memory" {
dp := metric.Gauge().DataPoints().At(0)
if timestamp == 0 {
timestamp = dp.Timestamp()
}
node_allocatable_memory = dp.IntValue()
}
}

remappers.AddMetrics(out, dataset, remappers.EmptyMutator,
remappers.Metric{
DataType: pmetric.MetricTypeGauge,
Name: "kubernetes.node.cpu.allocatable.cores",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we define these as constants as well and maybe define the "remapping" definition in a golang map? If the map makes the result too ugly then we can just go with the constants.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@ChrsMark ChrsMark Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that only the datasets where moved not the metric names 🤔 .
In any case, since this is mostly for code readability it can be done/considered in a follow-up to unblock the release.

Timestamp: timestamp,
IntValue: &node_allocatable_cpu,
},
remappers.Metric{
DataType: pmetric.MetricTypeGauge,
Name: "kubernetes.node.memory.allocatable.bytes",
Timestamp: timestamp,
IntValue: &node_allocatable_memory,
},
)

return nil
}
41 changes: 41 additions & 0 deletions remappers/kubernetesmetrics/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package kubernetesmetrics

type config struct {
KubernetesIntegrationDataset bool
}

// Option allows configuring the behavior of the kubernetes remapper.
type Option func(config) config

func newConfig(opts ...Option) (cfg config) {
for _, opt := range opts {
cfg = opt(cfg)
}
return cfg
}

// WithKubernetesIntegrationDataset sets the dataset of the remapped metrics as
// as per the kubernetes integration. Example: kubernetes.pod
func WithKubernetesIntegrationDataset(b bool) Option {
return func(c config) config {
c.KubernetesIntegrationDataset = b
return c
}
}
51 changes: 51 additions & 0 deletions remappers/kubernetesmetrics/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package kubernetesmetrics

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestConfig(t *testing.T) {
for _, tc := range []struct {
name string
opts []Option
expected config
}{
{
name: "default",
opts: nil,
expected: config{
KubernetesIntegrationDataset: false,
},
},
{
name: "k8s_integration_dataset",
opts: []Option{WithKubernetesIntegrationDataset(true)},
expected: config{
KubernetesIntegrationDataset: true,
},
},
} {
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.expected, newConfig(tc.opts...))
})
}
}
104 changes: 104 additions & 0 deletions remappers/kubernetesmetrics/k8smetrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package kubernetesmetrics

import (
"path"
"strings"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)

var scraperToElasticDataset = map[string]string{
"kubeletstatsreceiver": "kubernetes.pod",
"k8sclusterreceiver": "kubernetes.node",
}

type remapFunc func(metrics pmetric.MetricSlice, out pmetric.MetricSlice, resource pcommon.Resource, dataset string) error

// Remapper maps the OTel Kubernetes to Elastic Kubernetes metrics. These remapped
// metrics power the curated Kibana dashboards. Each datapoint translated using
// the remapper has the `event.processor` attribute set to `kubernetes`.
type Remapper struct {
logger *zap.Logger
cfg config
}

// NewRemapper creates a new instance of kubernetes remapper.
func NewRemapper(logger *zap.Logger, opts ...Option) *Remapper {
return &Remapper{
cfg: newConfig(opts...),
logger: logger,
}
}

var remapFuncs = map[string]remapFunc{
"kubeletstatsreceiver": addKubeletMetrics,
"k8sclusterreceiver": addClusterMetrics,
}

// Remap remaps an OTel ScopeMetrics to a list of OTel metrics such that the
// remapped metrics could be trivially converted into Elastic system metrics.
// It accepts the resource attributes to enrich the remapped metrics as per
// Elastic convention. The current remapping logic assumes that each Metric
// in the ScopeMetric will have datapoints for a single timestamp only. The
// remapped metrics are added to the output `MetricSlice`.
func (r *Remapper) Remap(
src pmetric.ScopeMetrics,
out pmetric.MetricSlice,
resource pcommon.Resource,
) {
if !r.Valid(src) {
return
}

scope := src.Scope()
scraper := path.Base(scope.Name())

var dataset string // an empty dataset defers setting dataset to the caller
if r.cfg.KubernetesIntegrationDataset {
var ok bool
dataset, ok = scraperToElasticDataset[scraper]
if !ok {
r.logger.Warn("no dataset defined for scraper", zap.String("scraper", scraper))
return
}
}

remapFunc, ok := remapFuncs[scraper]
if !ok {
return
}

err := remapFunc(src.Metrics(), out, resource, dataset)
if err != nil {
r.logger.Warn(
"failed to remap OTel kubernetes",
zap.String("scope", scope.Name()),
zap.Error(err),
)
}
}

// Valid validates a ScopeMetric against the kubernetes metrics remapper requirements.
// Kubernetes remapper only remaps metrics from kubeletstatsreceiver or k8sclusterreceiver.
func (r *Remapper) Valid(sm pmetric.ScopeMetrics) bool {
return strings.HasPrefix(sm.Scope().Name(), "otelcol/kubeletstatsreceiver") || strings.HasPrefix(sm.Scope().Name(), "otelcol/k8sclusterreceiver")
}
Loading