Skip to content

Commit

Permalink
Direct EMF output to stdout to support AWS Lambda (#2720)
Browse files Browse the repository at this point in the history
* Direct EMF output to stdout to support AWS Lambda

* fix some old lib after merging from upstream

* Use output_destination in customer config

* use lowercase in output_destination config
  • Loading branch information
shaochengwang authored Mar 25, 2021
1 parent 0dcbf0a commit e8bf24c
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 19 deletions.
1 change: 1 addition & 0 deletions exporter/awsemfexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ The following exporter configuration parameters are supported.
| `max_retries` | Maximum number of retries before abandoning an attempt to post data. | 1 |
| `dimension_rollup_option`| DimensionRollupOption is the option for metrics dimension rollup. Three options are available. |"ZeroAndSingleDimensionRollup" (Enable both zero dimension rollup and single dimension rollup)|
| `resource_to_telemetry_conversion` | "resource_to_telemetry_conversion" is the option for converting resource attributes to telemetry attributes. It has only one config onption- `enabled`. For metrics, if `enabled=true`, all the resource attributes will be converted to metric labels by default. See `Resource Attributes to Metric Labels` section below for examples. | `enabled=false` |
| `output_destination` | "output_destination" is an option to specify the EMFExporter output. Currently, two options are available. "cloudwatch" or "stdout" | `cloudwatch` |
| `parse_json_encoded_attr_values` | List of attribute keys whose corresponding values are JSON-encoded strings and will be converted to JSON structures in emf logs. For example, the attribute string value "{\\"x\\":5,\\"y\\":6}" will be converted to a json object: ```{"x": 5, "y": 6}```| [ ] |
| [`metric_declarations`](#metric_declaration) | List of rules for filtering exported metrics and their dimensions. | [ ] |
| [`metric_descriptors`](#metric_descriptor) | List of rules for inserting or updating metric descriptors.| [ ]|
Expand Down
6 changes: 6 additions & 0 deletions exporter/awsemfexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ type Config struct {
// MetricDescriptors is the list of override metric descriptors that are sent to the CloudWatch
MetricDescriptors []MetricDescriptor `mapstructure:"metric_descriptors"`

// OutputDestination is an option to specify the EMFExporter output. Default option is "cloudwatch"
// "cloudwatch" - direct the exporter output to CloudWatch backend
// "stdout" - direct the exporter output to stdout
// TODO: we can support directing output to a file (in the future) while customer specifies a file path here.
OutputDestination string `mapstructure:"output_destination"`

// ResourceToTelemetrySettings is the option for converting resource attrihutes to telemetry attributes.
// "Enabled" - A boolean field to enable/disable this option. Default is `false`.
// If enabled, all the resource attributes will be converted to metric labels by default.
Expand Down
2 changes: 2 additions & 0 deletions exporter/awsemfexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func TestLoadConfig(t *testing.T) {
Region: "us-west-2",
RoleARN: "arn:aws:iam::123456789:role/monitoring-EKS-NodeInstanceRole",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
OutputDestination: "cloudwatch",
ParseJSONEncodedAttributeValues: make([]string, 0),
MetricDeclarations: []*MetricDeclaration{},
MetricDescriptors: []MetricDescriptor{},
Expand All @@ -80,6 +81,7 @@ func TestLoadConfig(t *testing.T) {
Region: "",
RoleARN: "",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
OutputDestination: "cloudwatch",
ResourceToTelemetrySettings: exporterhelper.ResourceToTelemetrySettings{Enabled: true},
ParseJSONEncodedAttributeValues: make([]string, 0),
MetricDeclarations: []*MetricDeclaration{},
Expand Down
52 changes: 33 additions & 19 deletions exporter/awsemfexporter/emf_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -31,6 +32,12 @@ import (
"go.uber.org/zap"
)

const (
// OutputDestination Options
outputDestinationCloudWatch = "cloudwatch"
outputDestinationStdout = "stdout"
)

type emfExporter struct {
//Each (log group, log stream) keeps a separate Pusher because of each (log group, log stream) requires separate stream token.
groupStreamToPusherMap map[string]map[string]Pusher
Expand Down Expand Up @@ -119,6 +126,7 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pdata.Metrics) err
groupedMetrics := make(map[interface{}]*GroupedMetric)
expConfig := emf.config.(*Config)
defaultLogStream := fmt.Sprintf("otel-stream-%s", emf.collectorID)
outputDestination := expConfig.OutputDestination

for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
Expand All @@ -128,31 +136,37 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pdata.Metrics) err
for _, groupedMetric := range groupedMetrics {
cWMetric := translateGroupedMetricToCWMetric(groupedMetric, expConfig)
putLogEvent := translateCWMetricToEMF(cWMetric, expConfig)
// Currently we only support two options for "OutputDestination".
if strings.EqualFold(outputDestination, outputDestinationStdout) {
fmt.Println(*putLogEvent.InputLogEvent.Message)
} else if strings.EqualFold(outputDestination, outputDestinationCloudWatch) {
logGroup := groupedMetric.Metadata.LogGroup
logStream := groupedMetric.Metadata.LogStream
if logStream == "" {
logStream = defaultLogStream
}

logGroup := groupedMetric.Metadata.LogGroup
logStream := groupedMetric.Metadata.LogStream
if logStream == "" {
logStream = defaultLogStream
}

pusher := emf.getPusher(logGroup, logStream)
if pusher != nil {
returnError := pusher.AddLogEntry(putLogEvent)
if returnError != nil {
return wrapErrorIfBadRequest(&returnError)
pusher := emf.getPusher(logGroup, logStream)
if pusher != nil {
returnError := pusher.AddLogEntry(putLogEvent)
if returnError != nil {
return wrapErrorIfBadRequest(&returnError)
}
}
}
}

for _, pusher := range emf.listPushers() {
returnError := pusher.ForceFlush()
if returnError != nil {
//TODO now we only have one pusher, so it's ok to return after first error occurred
err := wrapErrorIfBadRequest(&returnError)
if err != nil {
emf.logger.Error("Error force flushing logs. Skipping to next pusher.", zap.Error(err))
if strings.EqualFold(outputDestination, outputDestinationCloudWatch) {
for _, pusher := range emf.listPushers() {
returnError := pusher.ForceFlush()
if returnError != nil {
//TODO now we only have one pusher, so it's ok to return after first error occurred
err := wrapErrorIfBadRequest(&returnError)
if err != nil {
emf.logger.Error("Error force flushing logs. Skipping to next pusher.", zap.Error(err))
}
return err
}
return err
}
}

Expand Down
60 changes: 60 additions & 0 deletions exporter/awsemfexporter/emf_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,66 @@ func TestConsumeMetrics(t *testing.T) {
require.NoError(t, exp.Shutdown(ctx))
}

func TestConsumeMetricsWithOutputDestination(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
factory := NewFactory()
expCfg := factory.CreateDefaultConfig().(*Config)
expCfg.Region = "us-west-2"
expCfg.MaxRetries = 0
expCfg.OutputDestination = "stdout"
exp, err := New(expCfg, component.ExporterCreateParams{Logger: zap.NewNop()})
assert.Nil(t, err)
assert.NotNil(t, exp)

mdata := internaldata.MetricsData{
Node: &commonpb.Node{
ServiceInfo: &commonpb.ServiceInfo{Name: "test-emf"},
LibraryInfo: &commonpb.LibraryInfo{ExporterVersion: "SomeVersion"},
},
Resource: &resourcepb.Resource{
Labels: map[string]string{
"resource": "R1",
},
},
Metrics: []*metricspb.Metric{
{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: "spanCounter",
Description: "Counting all the spans",
Unit: "Count",
Type: metricspb.MetricDescriptor_CUMULATIVE_INT64,
LabelKeys: []*metricspb.LabelKey{
{Key: "spanName"},
{Key: "isItAnError"},
},
},
Timeseries: []*metricspb.TimeSeries{
{
LabelValues: []*metricspb.LabelValue{
{Value: "testSpan", HasValue: true},
{Value: "false", HasValue: true},
},
Points: []*metricspb.Point{
{
Timestamp: &timestamp.Timestamp{
Seconds: 1234567890123,
},
Value: &metricspb.Point_Int64Value{
Int64Value: 1,
},
},
},
},
},
},
},
}
md := internaldata.OCToMetrics(mdata)
require.NoError(t, exp.ConsumeMetrics(ctx, md))
require.NoError(t, exp.Shutdown(ctx))
}

func TestConsumeMetricsWithLogGroupStreamConfig(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
1 change: 1 addition & 0 deletions exporter/awsemfexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func createDefaultConfig() configmodels.Exporter {
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
ParseJSONEncodedAttributeValues: make([]string, 0),
MetricDeclarations: make([]*MetricDeclaration, 0),
OutputDestination: "cloudwatch",
logger: nil,
}
}
Expand Down

0 comments on commit e8bf24c

Please sign in to comment.