Skip to content

Commit

Permalink
Add a config to decode json-encoded strings in attribute values (#2827)
Browse files Browse the repository at this point in the history
* decoding json-encoded attribute values

* update config unit tests

* cover exception cases
  • Loading branch information
pxaws authored and pmatyjasek-sumo committed Apr 28, 2021
1 parent bef4d3a commit 94e7ff1
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 46 deletions.
1 change: 1 addition & 0 deletions exporter/awsemfexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ The following exporter configuration parameters are supported.
| `max_retries` | Maximum number of retries before abandoning an attempt to post data. | 1 |
| `dimension_rollup_option`| DimensionRollupOption is the option for metrics dimension rollup. Three options are available. |"ZeroAndSingleDimensionRollup" (Enable both zero dimension rollup and single dimension rollup)|
| `resource_to_telemetry_conversion` | "resource_to_telemetry_conversion" is the option for converting resource attributes to telemetry attributes. It has only one config onption- `enabled`. For metrics, if `enabled=true`, all the resource attributes will be converted to metric labels by default. See `Resource Attributes to Metric Labels` section below for examples. | `enabled=false` |
| `parse_json_encoded_attr_values` | List of attribute keys whose corresponding values are JSON-encoded strings and will be converted to JSON structures in emf logs. For example, the attribute string value "{\\"x\\":5,\\"y\\":6}" will be converted to a json object: ```{"x": 5, "y": 6}```| [ ] |
| [`metric_declarations`](#metric_declaration) | List of rules for filtering exported metrics and their dimensions. | [ ] |
| [`metric_descriptors`](#metric_descriptor) | List of rules for inserting or updating metric descriptors.| [ ]|

Expand Down
4 changes: 4 additions & 0 deletions exporter/awsemfexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ type Config struct {
// "SingleDimensionRollupOnly" - Enable single dimension rollup
// "NoDimensionRollup" - No dimension rollup (only keep original metrics which contain all dimensions)
DimensionRollupOption string `mapstructure:"dimension_rollup_option"`
// ParseJSONEncodedAttributeValues is an array of attribute keys whose corresponding values are JSON-encoded as strings.
// Those strings will be decoded to its original json structure.
ParseJSONEncodedAttributeValues []string `mapstructure:"parse_json_encoded_attr_values"`

// MetricDeclarations is the list of rules to be used to set dimensions for exported metrics.
MetricDeclarations []*MetricDeclaration `mapstructure:"metric_declarations"`

Expand Down
56 changes: 29 additions & 27 deletions exporter/awsemfexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,39 +49,41 @@ func TestLoadConfig(t *testing.T) {
r1.Validate()
assert.Equal(t,
&Config{
ExporterSettings: configmodels.ExporterSettings{TypeVal: configmodels.Type(typeStr), NameVal: "awsemf/1"},
LogGroupName: "",
LogStreamName: "",
Endpoint: "",
RequestTimeoutSeconds: 30,
MaxRetries: 1,
NoVerifySSL: false,
ProxyAddress: "",
Region: "us-west-2",
RoleARN: "arn:aws:iam::123456789:role/monitoring-EKS-NodeInstanceRole",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
MetricDeclarations: []*MetricDeclaration{},
MetricDescriptors: []MetricDescriptor{},
ExporterSettings: configmodels.ExporterSettings{TypeVal: configmodels.Type(typeStr), NameVal: "awsemf/1"},
LogGroupName: "",
LogStreamName: "",
Endpoint: "",
RequestTimeoutSeconds: 30,
MaxRetries: 1,
NoVerifySSL: false,
ProxyAddress: "",
Region: "us-west-2",
RoleARN: "arn:aws:iam::123456789:role/monitoring-EKS-NodeInstanceRole",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
ParseJSONEncodedAttributeValues: make([]string, 0),
MetricDeclarations: []*MetricDeclaration{},
MetricDescriptors: []MetricDescriptor{},
}, r1)

r2 := cfg.Exporters["awsemf/resource_attr_to_label"].(*Config)
r2.Validate()
assert.Equal(t, r2,
&Config{
ExporterSettings: configmodels.ExporterSettings{TypeVal: configmodels.Type(typeStr), NameVal: "awsemf/resource_attr_to_label"},
LogGroupName: "",
LogStreamName: "",
Endpoint: "",
RequestTimeoutSeconds: 30,
MaxRetries: 1,
NoVerifySSL: false,
ProxyAddress: "",
Region: "",
RoleARN: "",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
ResourceToTelemetrySettings: exporterhelper.ResourceToTelemetrySettings{Enabled: true},
MetricDeclarations: []*MetricDeclaration{},
MetricDescriptors: []MetricDescriptor{},
ExporterSettings: configmodels.ExporterSettings{TypeVal: configmodels.Type(typeStr), NameVal: "awsemf/resource_attr_to_label"},
LogGroupName: "",
LogStreamName: "",
Endpoint: "",
RequestTimeoutSeconds: 30,
MaxRetries: 1,
NoVerifySSL: false,
ProxyAddress: "",
Region: "",
RoleARN: "",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
ResourceToTelemetrySettings: exporterhelper.ResourceToTelemetrySettings{Enabled: true},
ParseJSONEncodedAttributeValues: make([]string, 0),
MetricDeclarations: []*MetricDeclaration{},
MetricDescriptors: []MetricDescriptor{},
})
}

Expand Down
2 changes: 1 addition & 1 deletion exporter/awsemfexporter/emf_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pdata.Metrics) err

for _, groupedMetric := range groupedMetrics {
cWMetric := translateGroupedMetricToCWMetric(groupedMetric, expConfig)
putLogEvent := translateCWMetricToEMF(cWMetric)
putLogEvent := translateCWMetricToEMF(cWMetric, expConfig)

logGroup := groupedMetric.Metadata.LogGroup
logStream := groupedMetric.Metadata.LogStream
Expand Down
27 changes: 14 additions & 13 deletions exporter/awsemfexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,20 @@ func createDefaultConfig() configmodels.Exporter {
TypeVal: configmodels.Type(typeStr),
NameVal: typeStr,
},
LogGroupName: "",
LogStreamName: "",
Namespace: "",
Endpoint: "",
RequestTimeoutSeconds: 30,
MaxRetries: 1,
NoVerifySSL: false,
ProxyAddress: "",
Region: "",
RoleARN: "",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
MetricDeclarations: make([]*MetricDeclaration, 0),
logger: nil,
LogGroupName: "",
LogStreamName: "",
Namespace: "",
Endpoint: "",
RequestTimeoutSeconds: 30,
MaxRetries: 1,
NoVerifySSL: false,
ProxyAddress: "",
Region: "",
RoleARN: "",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
ParseJSONEncodedAttributeValues: make([]string, 0),
MetricDeclarations: make([]*MetricDeclaration, 0),
logger: nil,
}
}

Expand Down
31 changes: 30 additions & 1 deletion exporter/awsemfexporter/metric_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package awsemfexporter
import (
"encoding/json"
"fmt"
"reflect"
"time"

"go.opentelemetry.io/collector/consumer/pdata"
Expand Down Expand Up @@ -332,11 +333,39 @@ func groupedMetricToCWMeasurementsWithFilters(groupedMetric *GroupedMetric, conf
}

// translateCWMetricToEMF converts CloudWatch Metric format to EMF.
func translateCWMetricToEMF(cWMetric *CWMetrics) *LogEvent {
func translateCWMetricToEMF(cWMetric *CWMetrics, config *Config) *LogEvent {
// convert CWMetric into map format for compatible with PLE input
cWMetricMap := make(map[string]interface{})
fieldMap := cWMetric.Fields

//restore the json objects that are stored as string in attributes
for _, key := range config.ParseJSONEncodedAttributeValues {
if fieldMap[key] == nil {
continue
}

if val, ok := fieldMap[key].(string); ok {
var f interface{}
err := json.Unmarshal([]byte(val), &f)
if err != nil {
config.logger.Debug(
"Failed to parse json-encoded string",
zap.String("label key", key),
zap.String("label value", val),
zap.Error(err),
)
continue
}
fieldMap[key] = f
} else {
config.logger.Debug(
"Invalid json-encoded data. A string is expected",
zap.Any("type", reflect.TypeOf(fieldMap[key])),
zap.Any("value", reflect.ValueOf(fieldMap[key])),
)
}
}

// Create `_aws` section only if there are measurements
if len(cWMetric.Measurements) > 0 {
// Create `_aws` section only if there are measurements
Expand Down
15 changes: 12 additions & 3 deletions exporter/awsemfexporter/metric_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,13 +535,22 @@ func TestTranslateCWMetricToEMF(t *testing.T) {
fields[oTellibDimensionKey] = "cloudwatch-otel"
fields["spanName"] = "test"
fields["spanCounter"] = 0
//add stringified json as attribute values
fields["kubernetes"] = "{\"container_name\":\"cloudwatch-agent\",\"docker\":{\"container_id\":\"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca\"},\"host\":\"ip-192-168-58-245.ec2.internal\",\"labels\":{\"controller-revision-hash\":\"5bdbf497dc\",\"name\":\"cloudwatch-agent\",\"pod-template-generation\":\"1\"},\"namespace_name\":\"amazon-cloudwatch\",\"pod_id\":\"e23f3413-af2e-4a98-89e0-5df2251e7f05\",\"pod_name\":\"cloudwatch-agent-26bl6\",\"pod_owners\":[{\"owner_kind\":\"DaemonSet\",\"owner_name\":\"cloudwatch-agent\"}]}"
fields["Sources"] = "[\"cadvisor\",\"pod\",\"calculated\"]"

config := &Config{
//include valid json string, a non-existing key, and keys whose value are not json/string
ParseJSONEncodedAttributeValues: []string{"kubernetes", "Sources", "NonExistingAttributeKey", "spanName", "spanCounter"},
logger: zap.NewNop(),
}

met := &CWMetrics{
TimestampMs: timestamp,
Fields: fields,
Measurements: []CWMeasurement{cwMeasurement},
}
inputLogEvent := translateCWMetricToEMF(met)
inputLogEvent := translateCWMetricToEMF(met, config)

assert.Equal(t, readFromFile("testdata/testTranslateCWMetricToEMF.json"), *inputLogEvent.InputLogEvent.Message, "Expect to be equal")
}
Expand Down Expand Up @@ -1994,7 +2003,7 @@ func TestTranslateCWMetricToEMFNoMeasurements(t *testing.T) {
Fields: fields,
Measurements: nil,
}
inputLogEvent := translateCWMetricToEMF(met)
inputLogEvent := translateCWMetricToEMF(met, &Config{})
expected := "{\"OTelLib\":\"cloudwatch-otel\",\"spanCounter\":0,\"spanName\":\"test\"}"

assert.Equal(t, expected, *inputLogEvent.InputLogEvent.Message)
Expand Down Expand Up @@ -2132,6 +2141,6 @@ func BenchmarkTranslateCWMetricToEMF(b *testing.B) {

b.ResetTimer()
for n := 0; n < b.N; n++ {
translateCWMetricToEMF(met)
translateCWMetricToEMF(met, &Config{})
}
}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"OTelLib":"cloudwatch-otel","_aws":{"CloudWatchMetrics":[{"Namespace":"test-emf","Dimensions":[["OTelLib"],["OTelLib","spanName"]],"Metrics":[{"Name":"spanCounter","Unit":"Count"}]}],"Timestamp":1596151098037},"spanCounter":0,"spanName":"test"}
{"OTelLib":"cloudwatch-otel","Sources":["cadvisor","pod","calculated"],"_aws":{"CloudWatchMetrics":[{"Namespace":"test-emf","Dimensions":[["OTelLib"],["OTelLib","spanName"]],"Metrics":[{"Name":"spanCounter","Unit":"Count"}]}],"Timestamp":1596151098037},"kubernetes":{"container_name":"cloudwatch-agent","docker":{"container_id":"fc1b0a4c3faaa1808e187486a3a90cbea883dccaf2e2c46d4069d663b032a1ca"},"host":"ip-192-168-58-245.ec2.internal","labels":{"controller-revision-hash":"5bdbf497dc","name":"cloudwatch-agent","pod-template-generation":"1"},"namespace_name":"amazon-cloudwatch","pod_id":"e23f3413-af2e-4a98-89e0-5df2251e7f05","pod_name":"cloudwatch-agent-26bl6","pod_owners":[{"owner_kind":"DaemonSet","owner_name":"cloudwatch-agent"}]},"spanCounter":0,"spanName":"test"}

0 comments on commit 94e7ff1

Please sign in to comment.