Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a config to decode json-encoded strings in attribute values #2827

Merged
merged 3 commits into from
Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions exporter/awsemfexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ The following exporter configuration parameters are supported.
| `max_retries` | Maximum number of retries before abandoning an attempt to post data. | 1 |
| `dimension_rollup_option`| DimensionRollupOption is the option for metrics dimension rollup. Three options are available. |"ZeroAndSingleDimensionRollup" (Enable both zero dimension rollup and single dimension rollup)|
| `resource_to_telemetry_conversion` | "resource_to_telemetry_conversion" is the option for converting resource attributes to telemetry attributes. It has only one config onption- `enabled`. For metrics, if `enabled=true`, all the resource attributes will be converted to metric labels by default. See `Resource Attributes to Metric Labels` section below for examples. | `enabled=false` |
| `parse_json_encoded_attr_values` | List of attribute keys whose corresponding values are JSON-encoded strings and will be converted to JSON structures in emf logs. For example, the attribute string value "{\\"x\\":5,\\"y\\":6}" will be converted to a json object: ```{"x": 5, "y": 6}```| [ ] |
| [`metric_declarations`](#metric_declaration) | List of rules for filtering exported metrics and their dimensions. | [ ] |
| [`metric_descriptors`](#metric_descriptor) | List of rules for inserting or updating metric descriptors.| [ ]|

Expand Down
4 changes: 4 additions & 0 deletions exporter/awsemfexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ type Config struct {
// "SingleDimensionRollupOnly" - Enable single dimension rollup
// "NoDimensionRollup" - No dimension rollup (only keep original metrics which contain all dimensions)
DimensionRollupOption string `mapstructure:"dimension_rollup_option"`
// ParseJSONEncodedAttributeValues is an array of attribute keys whose corresponding values are JSON-encoded as strings.
// Those strings will be decoded to its original json structure.
ParseJSONEncodedAttributeValues []string `mapstructure:"parse_json_encoded_attr_values"`

// MetricDeclarations is the list of rules to be used to set dimensions for exported metrics.
MetricDeclarations []*MetricDeclaration `mapstructure:"metric_declarations"`

Expand Down
56 changes: 29 additions & 27 deletions exporter/awsemfexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,39 +49,41 @@ func TestLoadConfig(t *testing.T) {
r1.Validate()
assert.Equal(t,
&Config{
ExporterSettings: configmodels.ExporterSettings{TypeVal: configmodels.Type(typeStr), NameVal: "awsemf/1"},
LogGroupName: "",
LogStreamName: "",
Endpoint: "",
RequestTimeoutSeconds: 30,
MaxRetries: 1,
NoVerifySSL: false,
ProxyAddress: "",
Region: "us-west-2",
RoleARN: "arn:aws:iam::123456789:role/monitoring-EKS-NodeInstanceRole",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
MetricDeclarations: []*MetricDeclaration{},
MetricDescriptors: []MetricDescriptor{},
ExporterSettings: configmodels.ExporterSettings{TypeVal: configmodels.Type(typeStr), NameVal: "awsemf/1"},
LogGroupName: "",
LogStreamName: "",
Endpoint: "",
RequestTimeoutSeconds: 30,
MaxRetries: 1,
NoVerifySSL: false,
ProxyAddress: "",
Region: "us-west-2",
RoleARN: "arn:aws:iam::123456789:role/monitoring-EKS-NodeInstanceRole",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
ParseJSONEncodedAttributeValues: make([]string, 0),
MetricDeclarations: []*MetricDeclaration{},
MetricDescriptors: []MetricDescriptor{},
}, r1)

r2 := cfg.Exporters["awsemf/resource_attr_to_label"].(*Config)
r2.Validate()
assert.Equal(t, r2,
&Config{
ExporterSettings: configmodels.ExporterSettings{TypeVal: configmodels.Type(typeStr), NameVal: "awsemf/resource_attr_to_label"},
LogGroupName: "",
LogStreamName: "",
Endpoint: "",
RequestTimeoutSeconds: 30,
MaxRetries: 1,
NoVerifySSL: false,
ProxyAddress: "",
Region: "",
RoleARN: "",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
ResourceToTelemetrySettings: exporterhelper.ResourceToTelemetrySettings{Enabled: true},
MetricDeclarations: []*MetricDeclaration{},
MetricDescriptors: []MetricDescriptor{},
ExporterSettings: configmodels.ExporterSettings{TypeVal: configmodels.Type(typeStr), NameVal: "awsemf/resource_attr_to_label"},
LogGroupName: "",
LogStreamName: "",
Endpoint: "",
RequestTimeoutSeconds: 30,
MaxRetries: 1,
NoVerifySSL: false,
ProxyAddress: "",
Region: "",
RoleARN: "",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
ResourceToTelemetrySettings: exporterhelper.ResourceToTelemetrySettings{Enabled: true},
ParseJSONEncodedAttributeValues: make([]string, 0),
MetricDeclarations: []*MetricDeclaration{},
MetricDescriptors: []MetricDescriptor{},
})
}

Expand Down
2 changes: 1 addition & 1 deletion exporter/awsemfexporter/emf_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pdata.Metrics) err

for _, groupedMetric := range groupedMetrics {
cWMetric := translateGroupedMetricToCWMetric(groupedMetric, expConfig)
putLogEvent := translateCWMetricToEMF(cWMetric)
putLogEvent := translateCWMetricToEMF(cWMetric, expConfig)

logGroup := groupedMetric.Metadata.LogGroup
logStream := groupedMetric.Metadata.LogStream
Expand Down
27 changes: 14 additions & 13 deletions exporter/awsemfexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,20 @@ func createDefaultConfig() configmodels.Exporter {
TypeVal: configmodels.Type(typeStr),
NameVal: typeStr,
},
LogGroupName: "",
LogStreamName: "",
Namespace: "",
Endpoint: "",
RequestTimeoutSeconds: 30,
MaxRetries: 1,
NoVerifySSL: false,
ProxyAddress: "",
Region: "",
RoleARN: "",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
MetricDeclarations: make([]*MetricDeclaration, 0),
logger: nil,
LogGroupName: "",
LogStreamName: "",
Namespace: "",
Endpoint: "",
RequestTimeoutSeconds: 30,
MaxRetries: 1,
NoVerifySSL: false,
ProxyAddress: "",
Region: "",
RoleARN: "",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
ParseJSONEncodedAttributeValues: make([]string, 0),
MetricDeclarations: make([]*MetricDeclaration, 0),
logger: nil,
}
}

Expand Down
31 changes: 30 additions & 1 deletion exporter/awsemfexporter/metric_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package awsemfexporter
import (
"encoding/json"
"fmt"
"reflect"
"time"

"go.opentelemetry.io/collector/consumer/pdata"
Expand Down Expand Up @@ -332,11 +333,39 @@ func groupedMetricToCWMeasurementsWithFilters(groupedMetric *GroupedMetric, conf
}

// translateCWMetricToEMF converts CloudWatch Metric format to EMF.
func translateCWMetricToEMF(cWMetric *CWMetrics) *LogEvent {
func translateCWMetricToEMF(cWMetric *CWMetrics, config *Config) *LogEvent {
// convert CWMetric into map format for compatible with PLE input
cWMetricMap := make(map[string]interface{})
fieldMap := cWMetric.Fields

//restore the json objects that are stored as string in attributes
for _, key := range config.ParseJSONEncodedAttributeValues {
if fieldMap[key] == nil {
continue
}

if val, ok := fieldMap[key].(string); ok {
var f interface{}
err := json.Unmarshal([]byte(val), &f)
if err != nil {
config.logger.Debug(
"Failed to parse json-encoded string",
zap.String("label key", key),
zap.String("label value", val),
zap.Error(err),
)
continue
}
fieldMap[key] = f
} else {
config.logger.Debug(
"Invalid json-encoded data. A string is expected",
zap.Any("type", reflect.TypeOf(fieldMap[key])),
zap.Any("value", reflect.ValueOf(fieldMap[key])),
)
}
}

// Create `_aws` section only if there are measurements
if len(cWMetric.Measurements) > 0 {
// Create `_aws` section only if there are measurements
Expand Down
15 changes: 12 additions & 3 deletions exporter/awsemfexporter/metric_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,13 +535,22 @@ func TestTranslateCWMetricToEMF(t *testing.T) {
fields[oTellibDimensionKey] = "cloudwatch-otel"
fields["spanName"] = "test"
fields["spanCounter"] = 0
//add stringified json as attribute values
fields["kubernetes"] = "{\"container_name\":\"cloudwatch-agent\",\"docker\":{\"container_id\":\"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca\"},\"host\":\"ip-192-168-58-245.ec2.internal\",\"labels\":{\"controller-revision-hash\":\"5bdbf497dc\",\"name\":\"cloudwatch-agent\",\"pod-template-generation\":\"1\"},\"namespace_name\":\"amazon-cloudwatch\",\"pod_id\":\"e23f3413-af2e-4a98-89e0-5df2251e7f05\",\"pod_name\":\"cloudwatch-agent-26bl6\",\"pod_owners\":[{\"owner_kind\":\"DaemonSet\",\"owner_name\":\"cloudwatch-agent\"}]}"
fields["Sources"] = "[\"cadvisor\",\"pod\",\"calculated\"]"

config := &Config{
//include valid json string, a non-existing key, and keys whose value are not json/string
ParseJSONEncodedAttributeValues: []string{"kubernetes", "Sources", "NonExistingAttributeKey", "spanName", "spanCounter"},
logger: zap.NewNop(),
}

met := &CWMetrics{
TimestampMs: timestamp,
Fields: fields,
Measurements: []CWMeasurement{cwMeasurement},
}
inputLogEvent := translateCWMetricToEMF(met)
inputLogEvent := translateCWMetricToEMF(met, config)

assert.Equal(t, readFromFile("testdata/testTranslateCWMetricToEMF.json"), *inputLogEvent.InputLogEvent.Message, "Expect to be equal")
}
Expand Down Expand Up @@ -1994,7 +2003,7 @@ func TestTranslateCWMetricToEMFNoMeasurements(t *testing.T) {
Fields: fields,
Measurements: nil,
}
inputLogEvent := translateCWMetricToEMF(met)
inputLogEvent := translateCWMetricToEMF(met, &Config{})
expected := "{\"OTelLib\":\"cloudwatch-otel\",\"spanCounter\":0,\"spanName\":\"test\"}"

assert.Equal(t, expected, *inputLogEvent.InputLogEvent.Message)
Expand Down Expand Up @@ -2132,6 +2141,6 @@ func BenchmarkTranslateCWMetricToEMF(b *testing.B) {

b.ResetTimer()
for n := 0; n < b.N; n++ {
translateCWMetricToEMF(met)
translateCWMetricToEMF(met, &Config{})
}
}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"OTelLib":"cloudwatch-otel","_aws":{"CloudWatchMetrics":[{"Namespace":"test-emf","Dimensions":[["OTelLib"],["OTelLib","spanName"]],"Metrics":[{"Name":"spanCounter","Unit":"Count"}]}],"Timestamp":1596151098037},"spanCounter":0,"spanName":"test"}
{"OTelLib":"cloudwatch-otel","Sources":["cadvisor","pod","calculated"],"_aws":{"CloudWatchMetrics":[{"Namespace":"test-emf","Dimensions":[["OTelLib"],["OTelLib","spanName"]],"Metrics":[{"Name":"spanCounter","Unit":"Count"}]}],"Timestamp":1596151098037},"kubernetes":{"container_name":"cloudwatch-agent","docker":{"container_id":"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca"},"host":"ip-192-168-58-245.ec2.internal","labels":{"controller-revision-hash":"5bdbf497dc","name":"cloudwatch-agent","pod-template-generation":"1"},"namespace_name":"amazon-cloudwatch","pod_id":"e23f3413-af2e-4a98-89e0-5df2251e7f05","pod_name":"cloudwatch-agent-26bl6","pod_owners":[{"owner_kind":"DaemonSet","owner_name":"cloudwatch-agent"}]},"spanCounter":0,"spanName":"test"}