Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[awsemfexporter] Implement metric filtering and dimension setting #1503

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion exporter/awsemfexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,15 @@ The following exporter configuration parameters are supported.
| `max_retries` | Maximum number of retries before abandoning an attempt to post data. | 1 |
| `dimension_rollup_option`| DimensionRollupOption is the option for metrics dimension rollup. Three options are available. |"ZeroAndSingleDimensionRollup" (Enable both zero dimension rollup and single dimension rollup)|
| `resource_to_telemetry_conversion` | "resource_to_telemetry_conversion" is the option for converting resource attributes to telemetry attributes. It has only one config onption- `enabled`. For metrics, if `enabled=true`, all the resource attributes will be converted to metric labels by default. See `Resource Attributes to Metric Labels` section below for examples. | `enabled=false` |

| [`metric_declarations`](#metric_declaration) | List of rules for filtering exported metrics and their dimensions. | [ ] |

### <metric_declaration>
A metric_declaration section characterizes a rule to be used to set dimensions for exported metrics, filtered by the incoming metrics' metric names.
| Name | Description | Default |
| :---------------- | :--------------------------------------------------------------------- | ------- |
| `dimensions` | List of dimension sets to be exported. | [[ ]] |
| `metric_name_selectors` | List of regex strings to filter metric names by. | [ ] |


## AWS Credential Configuration

Expand Down
2 changes: 2 additions & 0 deletions exporter/awsemfexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type Config struct {
// "SingleDimensionRollupOnly" - Enable single dimension rollup
// "NoDimensionRollup" - No dimension rollup (only keep original metrics which contain all dimensions)
DimensionRollupOption string `mapstructure:"dimension_rollup_option"`
// MetricDeclarations is a list of rules to be used to set dimensions for exported metrics.
MetricDeclarations []*MetricDeclaration `mapstructure:"metric_declarations"`

// ResourceToTelemetrySettings is the option for converting resource attrihutes to telemetry attributes.
// "Enabled" - A boolean field to enable/disable this option. Default is `false`.
Expand Down
2 changes: 2 additions & 0 deletions exporter/awsemfexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func TestLoadConfig(t *testing.T) {
Region: "us-west-2",
RoleARN: "arn:aws:iam::123456789:role/monitoring-EKS-NodeInstanceRole",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
MetricDeclarations: []*MetricDeclaration{},
})

r2 := cfg.Exporters["awsemf/resource_attr_to_label"].(*Config)
Expand All @@ -75,5 +76,6 @@ func TestLoadConfig(t *testing.T) {
RoleARN: "",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
ResourceToTelemetrySettings: exporterhelper.ResourceToTelemetrySettings{Enabled: true},
MetricDeclarations: []*MetricDeclaration{},
})
}
15 changes: 14 additions & 1 deletion exporter/awsemfexporter/emf_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,19 @@ func New(
svcStructuredLog := NewCloudWatchLogsClient(logger, awsConfig, session)
collectorIdentifier, _ := uuid.NewRandom()

// Initialize metric declarations and filter out invalid ones
emfConfig := config.(*Config)
var validDeclarations []*MetricDeclaration
for _, declaration := range emfConfig.MetricDeclarations {
err := declaration.Init(logger)
if err != nil {
logger.Warn("Dropped metric declaration. Error: " + err.Error() + ".")
} else {
validDeclarations = append(validDeclarations, declaration)
}
}
emfConfig.MetricDeclarations = validDeclarations

emfExporter := &emfExporter{
svcStructuredLog: svcStructuredLog,
config: config,
Expand Down Expand Up @@ -212,7 +225,7 @@ func generateLogEventFromMetric(metric pdata.Metrics, config *Config) ([]*LogEve
cwMetricLists = append(cwMetricLists, cwm...)
}

return TranslateCWMetricToEMF(cwMetricLists), totalDroppedMetrics, namespace
return TranslateCWMetricToEMF(cwMetricLists, config.logger), totalDroppedMetrics, namespace
}

func wrapErrorIfBadRequest(err *error) error {
Expand Down
57 changes: 57 additions & 0 deletions exporter/awsemfexporter/emf_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/translator/internaldata"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
)

func init() {
Expand Down Expand Up @@ -236,6 +238,61 @@ func TestNewExporterWithoutConfig(t *testing.T) {
assert.NotNil(t, expCfg.logger)
}

func TestNewExporterWithMetricDeclarations(t *testing.T) {
factory := NewFactory()
expCfg := factory.CreateDefaultConfig().(*Config)
expCfg.Region = "us-west-2"
expCfg.MaxRetries = defaultRetryCount
expCfg.LogGroupName = "test-logGroupName"
expCfg.LogStreamName = "test-logStreamName"
mds := []*MetricDeclaration{
{
MetricNameSelectors: []string{"a", "b"},
},
{
MetricNameSelectors: []string{"c", "d"},
},
{
MetricNameSelectors: nil,
},
{
Dimensions: [][]string{
{"foo"},
{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k"},
},
MetricNameSelectors: []string{"a"},
},
}
expCfg.MetricDeclarations = mds

obs, logs := observer.New(zap.WarnLevel)
logger := zap.New(obs)
exp, err := New(expCfg, component.ExporterCreateParams{Logger: logger})
assert.Nil(t, err)
assert.NotNil(t, exp)

emfExporter := exp.(*emfExporter)
config := emfExporter.config.(*Config)
// Invalid metric declaration should be filtered out
assert.Equal(t, 3, len(config.MetricDeclarations))
// Invalid dimensions (> 10 dims) should be filtered out
assert.Equal(t, 1, len(config.MetricDeclarations[2].Dimensions))

// Test output warning logs
expectedLogs := []observer.LoggedEntry{
{
Entry: zapcore.Entry{Level: zap.WarnLevel, Message: "Dropped metric declaration. Error: invalid metric declaration: no metric name selectors defined."},
Context: []zapcore.Field{},
},
{
Entry: zapcore.Entry{Level: zap.WarnLevel, Message: "Dropped dimension set: > 10 dimensions specified."},
Context: []zapcore.Field{zap.String("dimensions", "a,b,c,d,e,f,g,h,i,j,k")},
},
}
assert.Equal(t, 2, logs.Len())
assert.Equal(t, expectedLogs, logs.AllUntimed())
}

func TestNewExporterWithoutSession(t *testing.T) {
exp, err := New(nil, component.ExporterCreateParams{Logger: zap.NewNop()})
assert.NotNil(t, err)
Expand Down
1 change: 1 addition & 0 deletions exporter/awsemfexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func createDefaultConfig() configmodels.Exporter {
Region: "",
RoleARN: "",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
MetricDeclarations: make([]*MetricDeclaration, 0),
logger: nil,
}
}
Expand Down
171 changes: 171 additions & 0 deletions exporter/awsemfexporter/metric_declaration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package awsemfexporter

import (
"errors"
"regexp"
"sort"
"strings"

"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap"
)

// MetricDeclaration characterizes a rule to be used to set dimensions for certain
// incoming metrics, filtered by their metric names.
type MetricDeclaration struct {
// Dimensions is a list of dimension sets (which are lists of dimension names) to be
// included in exported metrics. If the metric does not contain any of the specified
// dimensions, the metric would be dropped (will only show up in logs).
Dimensions [][]string `mapstructure:"dimensions"`
// MetricNameSelectors is a list of regex strings to be matched against metric names
// to determine which metrics should be included with this metric declaration rule.
MetricNameSelectors []string `mapstructure:"metric_name_selectors"`

// metricRegexList is a list of compiled regexes for metric name selectors.
metricRegexList []*regexp.Regexp
}

// Remove duplicated entries from dimension set.
func dedupDimensionSet(dimensions []string) (deduped []string, hasDuplicate bool) {
seen := make(map[string]bool, len(dimensions))
for _, v := range dimensions {
seen[v] = true
}
hasDuplicate = (len(seen) < len(dimensions))
if !hasDuplicate {
deduped = dimensions
return
}
deduped = make([]string, len(seen))
idx := 0
for dim := range seen {
deduped[idx] = dim
idx++
}
return
}

// Init initializes the MetricDeclaration struct. Performs validation and compiles
// regex strings. Dimensions are deduped and sorted.
func (m *MetricDeclaration) Init(logger *zap.Logger) (err error) {
// Return error if no metric name selectors are defined
if len(m.MetricNameSelectors) == 0 {
return errors.New("invalid metric declaration: no metric name selectors defined")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a user inputs an incorrect config, is this the log message they will see? Is it enough information to fix the config issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They would see the message "Dropped metric declaration. Error: invalid metric declaration: no metric name selectors defined.". I think this is sufficient as it tells the user that a metric has been dropped due to no metric name selectors defined. I'm open to suggestions on how to make this better though

}

// Filter out duplicate dimension sets and those with more than 10 elements
validDims := make([][]string, 0, len(m.Dimensions))
seen := make(map[string]bool, len(m.Dimensions))
for _, dimSet := range m.Dimensions {
concatenatedDims := strings.Join(dimSet, ",")
if len(dimSet) > 10 {
logger.Warn("Dropped dimension set: > 10 dimensions specified.", zap.String("dimensions", concatenatedDims))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm, these warnings only happen on startup? Even so we might use debug logging, I think collector generally leans towards less logging messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, this only happens on startup. In this case, would it be ok to use Warn since it's only once on startup? I feel that this information is useful for the user to fix in case they don't enable Debug level logging.

continue
}

// Dedup dimensions within dimension set
dedupedDims, hasDuplicate := dedupDimensionSet(dimSet)
if hasDuplicate {
logger.Warn("Removed duplicates from dimension set.", zap.String("dimensions", concatenatedDims))
}

// Sort dimensions
sort.Strings(dedupedDims)

// Dedup dimension sets
key := strings.Join(dedupedDims, ",")
if _, ok := seen[key]; ok {
logger.Warn("Dropped dimension set: duplicated dimension set.", zap.String("dimensions", concatenatedDims))
continue
}
seen[key] = true
validDims = append(validDims, dedupedDims)
}
m.Dimensions = validDims

m.metricRegexList = make([]*regexp.Regexp, len(m.MetricNameSelectors))
for i, selector := range m.MetricNameSelectors {
m.metricRegexList[i] = regexp.MustCompile(selector)
}
return
}

// Matches returns true if the given OTLP Metric's name matches any of the Metric
// Declaration's metric name selectors.
func (m *MetricDeclaration) Matches(metric *pdata.Metric) bool {
for _, regex := range m.metricRegexList {
if regex.MatchString(metric.Name()) {
return true
}
}
return false
}

// ExtractDimensions extracts dimensions within the MetricDeclaration that only
// contains labels found in `labels`. Returned order of dimensions are preserved.
func (m *MetricDeclaration) ExtractDimensions(labels map[string]string) (dimensions [][]string) {
for _, dimensionSet := range m.Dimensions {
if len(dimensionSet) == 0 {
continue
}
includeSet := true
for _, dim := range dimensionSet {
if _, ok := labels[dim]; !ok {
includeSet = false
break
}
}
if includeSet {
dimensions = append(dimensions, dimensionSet)
}
}
return
}

// processMetricDeclarations processes a list of MetricDeclarations and creates a
// list of dimension sets that matches the given `metric`. This list is then aggregated
// together with the rolled-up dimensions. Returned dimension sets
// are deduped and the dimensions in each dimension set are sorted.
// Prerequisite:
// 1. metricDeclarations' dimensions are sorted.
func processMetricDeclarations(metricDeclarations []*MetricDeclaration, metric *pdata.Metric,
labels map[string]string, rolledUpDimensions [][]string) (dimensions [][]string) {
seen := make(map[string]bool)
addDimSet := func(dimSet []string) {
key := strings.Join(dimSet, ",")
// Only add dimension set if not a duplicate
if _, ok := seen[key]; !ok {
dimensions = append(dimensions, dimSet)
seen[key] = true
}
}
// Extract and append dimensions from metric declarations
for _, m := range metricDeclarations {
if m.Matches(metric) {
extractedDims := m.ExtractDimensions(labels)
for _, dimSet := range extractedDims {
addDimSet(dimSet)
}
}
}
// Add on rolled-up dimensions
for _, dimSet := range rolledUpDimensions {
sort.Strings(dimSet)
addDimSet(dimSet)
}
return
}
Loading