Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Direct EMF output to stdout to support AWS Lambda #2720

Merged
merged 6 commits into from
Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions exporter/awsemfexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ The following exporter configuration parameters are supported.
| `max_retries` | Maximum number of retries before abandoning an attempt to post data. | 1 |
| `dimension_rollup_option`| DimensionRollupOption is the option for metrics dimension rollup. Three options are available. |"ZeroAndSingleDimensionRollup" (Enable both zero dimension rollup and single dimension rollup)|
| `resource_to_telemetry_conversion` | "resource_to_telemetry_conversion" is the option for converting resource attributes to telemetry attributes. It has only one config onption- `enabled`. For metrics, if `enabled=true`, all the resource attributes will be converted to metric labels by default. See `Resource Attributes to Metric Labels` section below for examples. | `enabled=false` |
| `output_destination` | "output_destination" is an option to specify the EMFExporter output. Currently, two options are available. "cloudwatch" or "stdout" | `cloudwatch` |
| `parse_json_encoded_attr_values` | List of attribute keys whose corresponding values are JSON-encoded strings and will be converted to JSON structures in emf logs. For example, the attribute string value "{\\"x\\":5,\\"y\\":6}" will be converted to a json object: ```{"x": 5, "y": 6}```| [ ] |
| [`metric_declarations`](#metric_declaration) | List of rules for filtering exported metrics and their dimensions. | [ ] |
| [`metric_descriptors`](#metric_descriptor) | List of rules for inserting or updating metric descriptors.| [ ]|
Expand Down
6 changes: 6 additions & 0 deletions exporter/awsemfexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ type Config struct {
// MetricDescriptors is the list of override metric descriptors that are sent to the CloudWatch
MetricDescriptors []MetricDescriptor `mapstructure:"metric_descriptors"`

// OutputDestination is an option to specify the EMFExporter output. Default option is "cloudwatch"
// "cloudwatch" - direct the exporter output to CloudWatch backend
// "stdout" - direct the exporter output to stdout
// TODO: we can support directing output to a file (in the future) while customer specifies a file path here.
OutputDestination string `mapstructure:"output_destination"`

// ResourceToTelemetrySettings is the option for converting resource attrihutes to telemetry attributes.
// "Enabled" - A boolean field to enable/disable this option. Default is `false`.
// If enabled, all the resource attributes will be converted to metric labels by default.
Expand Down
2 changes: 2 additions & 0 deletions exporter/awsemfexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func TestLoadConfig(t *testing.T) {
Region: "us-west-2",
RoleARN: "arn:aws:iam::123456789:role/monitoring-EKS-NodeInstanceRole",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
OutputDestination: "cloudwatch",
ParseJSONEncodedAttributeValues: make([]string, 0),
MetricDeclarations: []*MetricDeclaration{},
MetricDescriptors: []MetricDescriptor{},
Expand All @@ -80,6 +81,7 @@ func TestLoadConfig(t *testing.T) {
Region: "",
RoleARN: "",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
OutputDestination: "cloudwatch",
ResourceToTelemetrySettings: exporterhelper.ResourceToTelemetrySettings{Enabled: true},
ParseJSONEncodedAttributeValues: make([]string, 0),
MetricDeclarations: []*MetricDeclaration{},
Expand Down
52 changes: 33 additions & 19 deletions exporter/awsemfexporter/emf_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -31,6 +32,12 @@ import (
"go.uber.org/zap"
)

const (
// OutputDestination Options
outputDestinationCloudWatch = "cloudwatch"
outputDestinationStdout = "stdout"
)

type emfExporter struct {
//Each (log group, log stream) keeps a separate Pusher because of each (log group, log stream) requires separate stream token.
groupStreamToPusherMap map[string]map[string]Pusher
Expand Down Expand Up @@ -119,6 +126,7 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pdata.Metrics) err
groupedMetrics := make(map[interface{}]*GroupedMetric)
expConfig := emf.config.(*Config)
defaultLogStream := fmt.Sprintf("otel-stream-%s", emf.collectorID)
outputDestination := expConfig.OutputDestination

for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
Expand All @@ -128,31 +136,37 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pdata.Metrics) err
for _, groupedMetric := range groupedMetrics {
cWMetric := translateGroupedMetricToCWMetric(groupedMetric, expConfig)
putLogEvent := translateCWMetricToEMF(cWMetric, expConfig)
// Currently we only support two options for "OutputDestination".
if strings.EqualFold(outputDestination, outputDestinationStdout) {
fmt.Println(*putLogEvent.InputLogEvent.Message)
} else if strings.EqualFold(outputDestination, outputDestinationCloudWatch) {
logGroup := groupedMetric.Metadata.LogGroup
logStream := groupedMetric.Metadata.LogStream
if logStream == "" {
logStream = defaultLogStream
}

logGroup := groupedMetric.Metadata.LogGroup
logStream := groupedMetric.Metadata.LogStream
if logStream == "" {
logStream = defaultLogStream
}

pusher := emf.getPusher(logGroup, logStream)
if pusher != nil {
returnError := pusher.AddLogEntry(putLogEvent)
if returnError != nil {
return wrapErrorIfBadRequest(&returnError)
pusher := emf.getPusher(logGroup, logStream)
if pusher != nil {
returnError := pusher.AddLogEntry(putLogEvent)
if returnError != nil {
return wrapErrorIfBadRequest(&returnError)
}
}
}
}

for _, pusher := range emf.listPushers() {
returnError := pusher.ForceFlush()
if returnError != nil {
//TODO now we only have one pusher, so it's ok to return after first error occurred
err := wrapErrorIfBadRequest(&returnError)
if err != nil {
emf.logger.Error("Error force flushing logs. Skipping to next pusher.", zap.Error(err))
if strings.EqualFold(outputDestination, outputDestinationCloudWatch) {
for _, pusher := range emf.listPushers() {
returnError := pusher.ForceFlush()
if returnError != nil {
//TODO now we only have one pusher, so it's ok to return after first error occurred
err := wrapErrorIfBadRequest(&returnError)
if err != nil {
emf.logger.Error("Error force flushing logs. Skipping to next pusher.", zap.Error(err))
}
return err
}
return err
}
}

Expand Down
60 changes: 60 additions & 0 deletions exporter/awsemfexporter/emf_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,66 @@ func TestConsumeMetrics(t *testing.T) {
require.NoError(t, exp.Shutdown(ctx))
}

func TestConsumeMetricsWithOutputDestination(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
factory := NewFactory()
expCfg := factory.CreateDefaultConfig().(*Config)
expCfg.Region = "us-west-2"
expCfg.MaxRetries = 0
expCfg.OutputDestination = "stdout"
exp, err := New(expCfg, component.ExporterCreateParams{Logger: zap.NewNop()})
assert.Nil(t, err)
assert.NotNil(t, exp)

mdata := internaldata.MetricsData{
Node: &commonpb.Node{
ServiceInfo: &commonpb.ServiceInfo{Name: "test-emf"},
LibraryInfo: &commonpb.LibraryInfo{ExporterVersion: "SomeVersion"},
},
Resource: &resourcepb.Resource{
Labels: map[string]string{
"resource": "R1",
},
},
Metrics: []*metricspb.Metric{
{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: "spanCounter",
Description: "Counting all the spans",
Unit: "Count",
Type: metricspb.MetricDescriptor_CUMULATIVE_INT64,
LabelKeys: []*metricspb.LabelKey{
{Key: "spanName"},
{Key: "isItAnError"},
},
},
Timeseries: []*metricspb.TimeSeries{
{
LabelValues: []*metricspb.LabelValue{
{Value: "testSpan", HasValue: true},
{Value: "false", HasValue: true},
},
Points: []*metricspb.Point{
{
Timestamp: &timestamp.Timestamp{
Seconds: 1234567890123,
},
Value: &metricspb.Point_Int64Value{
Int64Value: 1,
},
},
},
},
},
},
},
}
md := internaldata.OCToMetrics(mdata)
require.NoError(t, exp.ConsumeMetrics(ctx, md))
require.NoError(t, exp.Shutdown(ctx))
}

func TestConsumeMetricsWithLogGroupStreamConfig(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
1 change: 1 addition & 0 deletions exporter/awsemfexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func createDefaultConfig() configmodels.Exporter {
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
ParseJSONEncodedAttributeValues: make([]string, 0),
MetricDeclarations: make([]*MetricDeclaration, 0),
OutputDestination: "cloudwatch",
logger: nil,
}
}
Expand Down