Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/signalfx] Support removal of direction attribute #12642

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions exporter/signalfxexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ The `translation_rules` metrics configuration field accepts a list of metric-tra
help ensure compatibility with custom charts and dashboards when using the OpenTelemetry Collector. It also provides the ability to produce custom metrics by copying, calculating new, or aggregating other metric values without requiring an additional processor.
The rule language is expressed in yaml mappings and is [documented here](./internal/translation/translator.go). Translation rules currently allow the following actions:

* `add_dimensions` - Adds dimensions for specified metrics, or globally
* `aggregate_metric` - Aggregates a metric through removal of specified dimensions
* `calculate_new_metric` - Creates a new metric via operating on two consistuent ones
* `convert_values` - Convert float values to int or int to float for specified metric names
Expand Down
188 changes: 104 additions & 84 deletions exporter/signalfxexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,17 +210,17 @@ func TestDefaultTranslationRules(t *testing.T) {

// system.network.operations.total new metric calculation
dps, ok = metrics["system.disk.operations.total"]
require.True(t, ok, "system.network.operations.total metrics not found")
require.True(t, ok, "system.disk.operations.total metrics not found")
require.Len(t, dps, 4)
require.Equal(t, 2, len(dps[0].Dimensions))

// system.network.io.total new metric calculation
dps, ok = metrics["system.disk.io.total"]
require.True(t, ok, "system.network.io.total metrics not found")
require.True(t, ok, "system.disk.io.total metrics not found")
require.Len(t, dps, 2)
require.Equal(t, 2, len(dps[0].Dimensions))
for _, dp := range dps {
require.Equal(t, "direction", dp.Dimensions[0].Key)
require.Equal(t, "direction", dp.Dimensions[1].Key)
crobert-1 marked this conversation as resolved.
Show resolved Hide resolved
switch dp.Dimensions[1].Value {
case "write":
require.Equal(t, int64(11e9), *dp.Value.IntValue)
Expand All @@ -231,13 +231,20 @@ func TestDefaultTranslationRules(t *testing.T) {

// disk_ops.total gauge from system.disk.operations cumulative, where is disk_ops.total
// is the cumulative across devices and directions.
// Test uses 4 metrics, each with more than 1 data point. Aggregate by sum reduces each metric to 1 data point.
// 4 metrics, each with one data point, create 3 delta data points, either how much the current data point is
// larger than previous, or current data point's value (if smaller than previous).
Comment on lines +234 to +236
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is an acceptable solution. We need to make sure that users see the same values in the charts built with disk_ops.total metric. I don't think it's going to be true if we break down disk_ops.total into several delta metrics. Please correct me if I'm wrong.

If it indeed introduces different results, I see the following options to restore the same disk_ops.total metric:

  1. Introduce a metricstransform processor in the default Splunk Otel distribution config to get a metric combined from system.disk.operations.read and system.disk.operations.write.
  2. Refactor the signalfx exporter translation logic to work on a metrics batch instead of just one metric.

1 approach seems simpler and more reliable to me.

Copy link
Member Author

@crobert-1 crobert-1 Jul 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pointers! I've brought this up as a potential concern in #12641 and had thought about possibly doing solution 2, but I was worried that it might have unintended side effects working on other metrics.

I'll look into option 1.

dps, ok = metrics["disk_ops.total"]
require.True(t, ok, "disk_ops.total metrics not found")
require.Len(t, dps, 1)
require.Equal(t, int64(8e3), *dps[0].Value.IntValue)
require.Equal(t, 1, len(dps[0].Dimensions))
require.Equal(t, "host", dps[0].Dimensions[0].Key)
require.Equal(t, "host0", dps[0].Dimensions[0].Value)
require.Len(t, dps, 3)
require.Equal(t, int64(4e3), *dps[0].Value.IntValue)
require.Equal(t, int64(4e3), *dps[1].Value.IntValue)
require.Equal(t, int64(10e3), *dps[2].Value.IntValue)
for _, dp := range dps {
require.Equal(t, 1, len(dp.Dimensions))
require.Equal(t, "host", dp.Dimensions[0].Key)
require.Equal(t, "host0", dp.Dimensions[0].Value)
}

// system.network.io.total new metric calculation
dps, ok = metrics["system.network.io.total"]
Expand All @@ -251,15 +258,19 @@ func TestDefaultTranslationRules(t *testing.T) {
require.Len(t, dps, 1)
require.Equal(t, 4, len(dps[0].Dimensions))
require.Equal(t, int64(350), *dps[0].Value.IntValue)
require.Equal(t, "direction", dps[0].Dimensions[0].Key)
require.Equal(t, "receive", dps[0].Dimensions[0].Value)
require.Equal(t, "direction", dps[0].Dimensions[3].Key)
require.Equal(t, "receive", dps[0].Dimensions[3].Value)

// network.total new metric calculation
dps, ok = metrics["network.total"]
require.True(t, ok, "network.total metrics not found")
require.Len(t, dps, 1)
// Expect two data points because two metrics are sent in. Aggregation is within a single metric, so we can't
// combine metrics.
require.Len(t, dps, 2)
require.Equal(t, 3, len(dps[0].Dimensions))
require.Equal(t, int64(10e9), *dps[0].Value.IntValue)
require.Equal(t, 3, len(dps[1].Dimensions))
require.Equal(t, int64(4e9), *dps[0].Value.IntValue)
require.Equal(t, int64(6e9), *dps[1].Value.IntValue)
}

func TestCreateMetricsExporterWithDefaultExcludeMetrics(t *testing.T) {
Expand Down Expand Up @@ -341,186 +352,195 @@ func testMetricsData() pmetric.Metrics {
dp12.SetIntVal(6e9)

m2 := ms.AppendEmpty()
m2.SetName("system.disk.io")
m2.SetName("system.disk.io.read")
m2.SetDescription("Disk I/O.")
m2.SetDataType(pmetric.MetricDataTypeSum)
m2.Sum().SetIsMonotonic(true)
m2.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative)
dp21 := m2.Sum().DataPoints().AppendEmpty()
dp21.Attributes().InsertString("host", "host0")
dp21.Attributes().InsertString("direction", "read")
dp21.Attributes().InsertString("device", "sda1")
dp21.Attributes().Sort()
dp21.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0)))
dp21.SetIntVal(1e9)
dp22 := m2.Sum().DataPoints().AppendEmpty()
dp22.Attributes().InsertString("host", "host0")
dp22.Attributes().InsertString("direction", "read")
dp22.Attributes().InsertString("device", "sda2")
dp22.Attributes().Sort()
dp22.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0)))
dp22.SetIntVal(2e9)
dp23 := m2.Sum().DataPoints().AppendEmpty()
m3 := ms.AppendEmpty()
m3.SetName("system.disk.io.write")
m3.SetDescription("Disk I/O.")
m3.SetDataType(pmetric.MetricDataTypeSum)
m3.Sum().SetIsMonotonic(true)
m3.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative)
dp23 := m3.Sum().DataPoints().AppendEmpty()
dp23.Attributes().InsertString("host", "host0")
dp23.Attributes().InsertString("direction", "write")
dp23.Attributes().InsertString("device", "sda1")
dp23.Attributes().Sort()
dp23.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0)))
dp23.SetIntVal(3e9)
dp24 := m2.Sum().DataPoints().AppendEmpty()
dp24 := m3.Sum().DataPoints().AppendEmpty()
dp24.Attributes().InsertString("host", "host0")
dp24.Attributes().InsertString("direction", "write")
dp24.Attributes().InsertString("device", "sda2")
dp24.Attributes().Sort()
dp24.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0)))
dp24.SetIntVal(8e9)

m3 := ms.AppendEmpty()
m3.SetName("system.disk.operations")
m3.SetDescription("Disk operations count.")
m3.SetUnit("bytes")
m3.SetDataType(pmetric.MetricDataTypeSum)
m3.Sum().SetIsMonotonic(true)
m3.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative)
dp31 := m3.Sum().DataPoints().AppendEmpty()
dp31.Attributes().InsertString("host", "host0")
dp31.Attributes().InsertString("direction", "write")
dp31.Attributes().InsertString("device", "sda1")
dp31.Attributes().Sort()
dp31.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0)))
dp31.SetIntVal(4e3)
dp32 := m3.Sum().DataPoints().AppendEmpty()
m6 := ms.AppendEmpty()
m6.SetName("system.disk.operations.read")
m6.SetDescription("Disk operations read count.")
m6.SetUnit("bytes")
m6.SetDataType(pmetric.MetricDataTypeSum)
m6.Sum().SetIsMonotonic(true)
m6.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative)
dp32 := m6.Sum().DataPoints().AppendEmpty()
dp32.Attributes().InsertString("host", "host0")
dp32.Attributes().InsertString("direction", "read")
dp32.Attributes().InsertString("device", "sda2")
dp32.Attributes().Sort()
dp32.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0)))
dp32.SetIntVal(6e3)
dp33 := m3.Sum().DataPoints().AppendEmpty()
m7 := ms.AppendEmpty()
m7.SetName("system.disk.operations.write")
m7.SetDescription("Disk operations write count.")
m7.SetUnit("bytes")
m7.SetDataType(pmetric.MetricDataTypeSum)
m7.Sum().SetIsMonotonic(true)
m7.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative)
dp31 := m7.Sum().DataPoints().AppendEmpty()
dp31.Attributes().InsertString("host", "host0")
dp31.Attributes().InsertString("device", "sda1")
dp31.Attributes().Sort()
dp31.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0)))
dp31.SetIntVal(4e3)
dp33 := m7.Sum().DataPoints().AppendEmpty()
dp33.Attributes().InsertString("host", "host0")
dp33.Attributes().InsertString("direction", "write")
dp33.Attributes().InsertString("device", "sda1")
dp33.Attributes().Sort()
dp33.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0)))
dp33.SetIntVal(1e3)
dp34 := m3.Sum().DataPoints().AppendEmpty()
dp34 := m7.Sum().DataPoints().AppendEmpty()
dp34.Attributes().InsertString("host", "host0")
dp34.Attributes().InsertString("direction", "write")
dp34.Attributes().InsertString("device", "sda2")
dp34.Attributes().Sort()
dp34.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0)))
dp34.SetIntVal(5e3)

m4 := ms.AppendEmpty()
m4.SetName("system.disk.operations")
m4.SetDescription("Disk operations count.")
m4.SetUnit("bytes")
m4.SetDataType(pmetric.MetricDataTypeSum)
m4.Sum().SetIsMonotonic(true)
m4.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative)
dp41 := m4.Sum().DataPoints().AppendEmpty()
m8 := ms.AppendEmpty()
m8.SetName("system.disk.operations.read")
m8.SetDescription("Disk operations read count.")
m8.SetUnit("bytes")
m8.SetDataType(pmetric.MetricDataTypeSum)
m8.Sum().SetIsMonotonic(true)
m8.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative)
dp41 := m8.Sum().DataPoints().AppendEmpty()
dp41.Attributes().InsertString("host", "host0")
dp41.Attributes().InsertString("direction", "read")
dp41.Attributes().InsertString("device", "sda1")
dp41.Attributes().Sort()
dp41.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000060, 0)))
dp41.SetIntVal(6e3)
dp42 := m4.Sum().DataPoints().AppendEmpty()
dp42 := m8.Sum().DataPoints().AppendEmpty()
dp42.Attributes().InsertString("host", "host0")
dp42.Attributes().InsertString("direction", "read")
dp42.Attributes().InsertString("device", "sda2")
dp42.Attributes().Sort()
dp42.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000060, 0)))
dp42.SetIntVal(8e3)
dp43 := m4.Sum().DataPoints().AppendEmpty()
m9 := ms.AppendEmpty()
m9.SetName("system.disk.operations.write")
m9.SetDescription("Disk operations write count.")
m9.SetUnit("bytes")
m9.SetDataType(pmetric.MetricDataTypeSum)
m9.Sum().SetIsMonotonic(true)
m9.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative)
dp43 := m9.Sum().DataPoints().AppendEmpty()
dp43.Attributes().InsertString("host", "host0")
dp43.Attributes().InsertString("direction", "write")
dp43.Attributes().InsertString("device", "sda1")
dp43.Attributes().Sort()
dp43.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000060, 0)))
dp43.SetIntVal(3e3)
dp44 := m4.Sum().DataPoints().AppendEmpty()
dp44 := m9.Sum().DataPoints().AppendEmpty()
dp44.Attributes().InsertString("host", "host0")
dp44.Attributes().InsertString("direction", "write")
dp44.Attributes().InsertString("device", "sda2")
dp44.Attributes().Sort()
dp44.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000060, 0)))
dp44.SetIntVal(7e3)

m5 := ms.AppendEmpty()
m5.SetName("system.network.io")
m5.SetDescription("The number of bytes transmitted and received")
m5.SetUnit("bytes")
m5.SetDataType(pmetric.MetricDataTypeGauge)
dp51 := m5.Gauge().DataPoints().AppendEmpty()
m10 := ms.AppendEmpty()
m10.SetName("system.network.io.receive")
m10.SetDescription("The number of bytes received")
m10.SetUnit("bytes")
m10.SetDataType(pmetric.MetricDataTypeGauge)
dp51 := m10.Gauge().DataPoints().AppendEmpty()
dp51.Attributes().InsertString("host", "host0")
dp51.Attributes().InsertString("direction", "receive")
dp51.Attributes().InsertString("device", "eth0")
dp51.Attributes().InsertString("kubernetes_node", "node0")
dp51.Attributes().InsertString("kubernetes_cluster", "cluster0")
dp51.Attributes().Sort()
dp51.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0)))
dp51.SetIntVal(4e9)
dp52 := m5.Gauge().DataPoints().AppendEmpty()
m11 := ms.AppendEmpty()
m11.SetName("system.network.io.transmit")
m11.SetDescription("The number of bytes transmitted")
m11.SetUnit("bytes")
m11.SetDataType(pmetric.MetricDataTypeGauge)
dp52 := m11.Gauge().DataPoints().AppendEmpty()
dp52.Attributes().InsertString("host", "host0")
dp52.Attributes().InsertString("direction", "transmit")
dp52.Attributes().InsertString("device", "eth0")
dp52.Attributes().InsertString("kubernetes_node", "node0")
dp52.Attributes().InsertString("kubernetes_cluster", "cluster0")
dp52.Attributes().Sort()
dp52.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0)))
dp52.SetIntVal(6e9)

m6 := ms.AppendEmpty()
m6.SetName("system.network.packets")
m6.SetDescription("The number of packets transferred")
m6.SetDataType(pmetric.MetricDataTypeGauge)
dp61 := m6.Gauge().DataPoints().AppendEmpty()
m12 := ms.AppendEmpty()
m12.SetName("system.network.packets.receive")
m12.SetDescription("The number of packets received")
m12.SetDataType(pmetric.MetricDataTypeGauge)
dp61 := m12.Gauge().DataPoints().AppendEmpty()
dp61.Attributes().InsertString("host", "host0")
dp61.Attributes().InsertString("direction", "receive")
dp61.Attributes().InsertString("device", "eth0")
dp61.Attributes().InsertString("kubernetes_node", "node0")
dp61.Attributes().InsertString("kubernetes_cluster", "cluster0")
dp61.Attributes().Sort()
dp61.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0)))
dp61.SetIntVal(200)
dp62 := m6.Gauge().DataPoints().AppendEmpty()
dp62 := m12.Gauge().DataPoints().AppendEmpty()
dp62.Attributes().InsertString("host", "host0")
dp62.Attributes().InsertString("direction", "receive")
dp62.Attributes().InsertString("device", "eth1")
dp62.Attributes().InsertString("kubernetes_node", "node0")
dp62.Attributes().InsertString("kubernetes_cluster", "cluster0")
dp62.Attributes().Sort()
dp62.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0)))
dp62.SetIntVal(150)

m7 := ms.AppendEmpty()
m7.SetName("container.memory.working_set")
m7.SetUnit("bytes")
m7.SetDataType(pmetric.MetricDataTypeGauge)
dp71 := m7.Gauge().DataPoints().AppendEmpty()
m14 := ms.AppendEmpty()
m14.SetName("container.memory.working_set")
m14.SetUnit("bytes")
m14.SetDataType(pmetric.MetricDataTypeGauge)
dp71 := m14.Gauge().DataPoints().AppendEmpty()
dp71.Attributes().InsertString("host", "host0")
dp71.Attributes().InsertString("kubernetes_node", "node0")
dp71.Attributes().InsertString("kubernetes_cluster", "cluster0")
dp71.Attributes().Sort()
dp71.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0)))
dp71.SetIntVal(1000)

m8 := ms.AppendEmpty()
m8.SetName("container.memory.page_faults")
m8.SetDataType(pmetric.MetricDataTypeGauge)
dp81 := m8.Gauge().DataPoints().AppendEmpty()
m16 := ms.AppendEmpty()
m16.SetName("container.memory.page_faults")
m16.SetDataType(pmetric.MetricDataTypeGauge)
dp81 := m16.Gauge().DataPoints().AppendEmpty()
dp81.Attributes().InsertString("host", "host0")
dp81.Attributes().InsertString("kubernetes_node", "node0")
dp81.Attributes().InsertString("kubernetes_cluster", "cluster0")
dp81.Attributes().Sort()
dp81.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0)))
dp81.SetIntVal(1000)

m9 := ms.AppendEmpty()
m9.SetName("container.memory.major_page_faults")
m9.SetDataType(pmetric.MetricDataTypeGauge)
dp91 := m9.Gauge().DataPoints().AppendEmpty()
m20 := ms.AppendEmpty()
m20.SetName("container.memory.major_page_faults")
m20.SetDataType(pmetric.MetricDataTypeGauge)
dp91 := m20.Gauge().DataPoints().AppendEmpty()
dp91.Attributes().InsertString("host", "host0")
dp91.Attributes().InsertString("kubernetes_node", "node0")
dp91.Attributes().InsertString("kubernetes_cluster", "cluster0")
Expand Down
Loading