diff --git a/processor/resourcedetectionprocessor/README.md b/processor/resourcedetectionprocessor/README.md index cc1b4e9c2ff0..e3a6b2eda2de 100644 --- a/processor/resourcedetectionprocessor/README.md +++ b/processor/resourcedetectionprocessor/README.md @@ -34,10 +34,26 @@ to read resource information from the [GCE metadata server](https://cloud.google * host.name * host.type +* Amazon ECS: Queries the [Task Metadata Endpoint](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint.html) (TMDE) to record information about the current ECS Task. Only TMDE V4 and V3 are supported. + + * cloud.provider (aws) + * cloud.account.id + * cloud.region + * cloud.zone + * cloud.infrastructure_service + * aws.ecs.cluster + * aws.ecs.task.arn + * aws.ecs.task.family + * aws.ecs.launchtype (V4 only) + * aws.log.group.names (V4 only) + * aws.log.group.arns (V4 only) + * aws.log.stream.names (V4 only) + * aws.log.stream.arns (V4 only) + ## Configuration ```yaml -# a list of resource detectors to run, valid options are: "env", "gce", "ec2" +# a list of resource detectors to run, valid options are: "env", "gce", "ec2", "ecs" detectors: [ ] # determines if existing resource attributes should be overridden or preserved, defaults to true override: diff --git a/processor/resourcedetectionprocessor/factory.go b/processor/resourcedetectionprocessor/factory.go index 061c0505be98..2c68a888d186 100644 --- a/processor/resourcedetectionprocessor/factory.go +++ b/processor/resourcedetectionprocessor/factory.go @@ -28,6 +28,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/aws/ec2" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/aws/ecs" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/env" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/gcp/gce" ) @@ -54,6 +55,7 @@ func NewFactory() component.ProcessorFactory { env.TypeStr: env.NewDetector, gce.TypeStr: gce.NewDetector, ec2.TypeStr: ec2.NewDetector, + ecs.TypeStr: ecs.NewDetector, }) f := &factory{ diff --git a/processor/resourcedetectionprocessor/internal/aws/ecs/ecs.go b/processor/resourcedetectionprocessor/internal/aws/ecs/ecs.go new file mode 100644 index 000000000000..5ce1f3011f26 --- /dev/null +++ b/processor/resourcedetectionprocessor/internal/aws/ecs/ecs.go @@ -0,0 +1,183 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ecs + +import ( + "bytes" + "context" + "fmt" + "log" + "net/http" + "os" + "strings" + + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/translator/conventions" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal" +) + +const ( + TypeStr = "ecs" + tmde3EnvVar = "ECS_CONTAINER_METADATA_URI" + tmde4EnvVar = "ECS_CONTAINER_METADATA_URI_V4" +) + +var _ internal.Detector = (*Detector)(nil) + +type Detector struct { + provider ecsMetadataProvider +} + +func NewDetector() (internal.Detector, error) { + return &Detector{provider: &ecsMetadataProviderImpl{client: &http.Client{}}}, nil +} + +// Records metadata retrieved from the ECS Task Metadata Endpoint (TMDE) as resource attributes +// TODO: Replace all attribute fields and enums with values defined in "conventions" once they exist +func (d *Detector) Detect(context.Context) (pdata.Resource, error) { + res := pdata.NewResource() + res.InitEmpty() + + tmde := getTmdeFromEnv() + + // Fail fast if neither env var is present + if tmde == "" { + log.Println("No Task Metadata Endpoint environment variable detected, skipping ECS resource detection") + return res, nil + } + + tmdeResp, err := d.provider.fetchTaskMetaData(tmde) + + if err != nil || tmdeResp == nil { + return res, err + } + + attr := res.Attributes() + attr.InsertString(conventions.AttributeCloudProvider, conventions.AttributeCloudProviderAWS) + attr.InsertString("cloud.infrastructure_service", "ECS") + attr.InsertString("aws.ecs.task.arn", tmdeResp.TaskARN) + attr.InsertString("aws.ecs.task.family", tmdeResp.Family) + + // TMDE returns the the short name or ARN, so we need to parse out the short name from ARN if applicable + cluster := parseCluster(tmdeResp.Cluster) + attr.InsertString("aws.ecs.cluster", cluster) + + region, account := parseRegionAndAccount(tmdeResp.TaskARN) + if account != "" { + attr.InsertString(conventions.AttributeCloudAccount, account) + } + + if region != "" { + attr.InsertString(conventions.AttributeCloudRegion, region) + } + + // The Availability Zone is not available in all Fargate runtimes + if tmdeResp.AvailabilityZone != "" { + attr.InsertString(conventions.AttributeCloudZone, tmdeResp.AvailabilityZone) + } + + // The launch type and log data attributes are only available in TMDE v4 + switch lt := strings.ToLower(tmdeResp.LaunchType); lt { + case "ec2": + attr.InsertString("aws.ecs.launchtype", "EC2") + + case "fargate": + attr.InsertString("aws.ecs.launchtype", "Fargate") + } + + selfMetaData, err := d.provider.fetchContainerMetaData(tmde) + + if err != nil || selfMetaData == nil { + return res, err + } + + logAttributes := [4]string{"aws.log.group.names", "aws.log.group.arns", "aws.log.stream.names", "aws.log.stream.arns"} + + for i, attribVal := range getValidLogData(tmdeResp.Containers, selfMetaData, account) { + if attribVal.Len() > 0 { + ava := pdata.NewAttributeValueArray() + ava.SetArrayVal(attribVal) + attr.Insert(logAttributes[i], ava) + } + } + + return res, nil +} + +func getTmdeFromEnv() string { + var tmde string + if tmde = strings.TrimSpace(os.Getenv(tmde4EnvVar)); tmde == "" { + tmde = strings.TrimSpace(os.Getenv(tmde3EnvVar)) + } + + return tmde +} + +func parseCluster(cluster string) string { + i := bytes.IndexByte([]byte(cluster), byte('/')) + if i != -1 { + return cluster[i+1:] + } + + return cluster +} + +// Parses the AWS Account ID and AWS Region from a task ARN +// See: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ecs-account-settings.html#ecs-resource-ids +func parseRegionAndAccount(taskARN string) (region string, account string) { + parts := strings.Split(taskARN, ":") + if len(parts) >= 5 { + return parts[3], parts[4] + } + + return "", "" +} + +// Filter out non-normal containers, our own container since we assume the collector is run as a sidecar, +// "init" containers which only run at startup then shutdown (as indicated by the "KnownStatus" attribute), +// containers not using AWS Logs, and those without log group metadata to get the final lists of valid log data +// See: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4.html#task-metadata-endpoint-v4-response +func getValidLogData(containers []Container, self *Container, account string) [4]pdata.AnyValueArray { + logGroupNames := pdata.NewAnyValueArray() + logGroupArns := pdata.NewAnyValueArray() + logStreamNames := pdata.NewAnyValueArray() + logStreamArns := pdata.NewAnyValueArray() + + for _, container := range containers { + logData := container.LogOptions + if container.Type == "NORMAL" && + container.KnownStatus == "RUNNING" && + container.LogDriver == "awslogs" && + self.DockerID != container.DockerID && + logData != (LogData{}) { + + logGroupNames.Append(pdata.NewAttributeValueString(logData.LogGroup)) + logGroupArns.Append(pdata.NewAttributeValueString(constructLogGroupArn(logData.Region, account, logData.LogGroup))) + logStreamNames.Append(pdata.NewAttributeValueString(logData.Stream)) + logStreamArns.Append(pdata.NewAttributeValueString(constructLogStreamArn(logData.Region, account, logData.LogGroup, logData.Stream))) + } + } + + return [4]pdata.AnyValueArray{logGroupNames, logGroupArns, logStreamNames, logStreamArns} +} + +func constructLogGroupArn(region, account, group string) string { + return fmt.Sprintf("arn:aws:logs:%s:%s:log-group:%s", region, account, group) +} + +func constructLogStreamArn(region, account, group, stream string) string { + return fmt.Sprintf("%s:log-stream:%s", constructLogGroupArn(region, account, group), stream) +} diff --git a/processor/resourcedetectionprocessor/internal/aws/ecs/ecs_test.go b/processor/resourcedetectionprocessor/internal/aws/ecs/ecs_test.go new file mode 100644 index 000000000000..c2ee9c625219 --- /dev/null +++ b/processor/resourcedetectionprocessor/internal/aws/ecs/ecs_test.go @@ -0,0 +1,182 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ecs + +import ( + "context" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/consumer/pdata" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal" +) + +type mockMetaDataProvider struct { + isV4 bool +} + +var _ ecsMetadataProvider = (*mockMetaDataProvider)(nil) + +func (md *mockMetaDataProvider) fetchTaskMetaData(tmde string) (*TaskMetaData, error) { + c := createTestContainer(md.isV4) + c.DockerID = "05281997" // Simulate one "application" and one "collector" container + cs := []Container{createTestContainer(md.isV4), c} + tmd := &TaskMetaData{ + Cluster: "cluster", + TaskARN: "arn:aws:ecs:us-west-2:123456789123:task/123", + Family: "family", + AvailabilityZone: "us-west-2a", + Containers: cs, + } + + if md.isV4 { + tmd.LaunchType = "EC2" + } + + return tmd, nil +} + +func (md *mockMetaDataProvider) fetchContainerMetaData(tmde string) (*Container, error) { + c := createTestContainer(md.isV4) + return &c, nil +} + +func Test_ecsNewDetector(t *testing.T) { + d, err := NewDetector() + + assert.NotNil(t, d) + assert.Nil(t, err) +} + +func Test_detectorReturnsIfNoEnvVars(t *testing.T) { + os.Clearenv() + d, _ := NewDetector() + res, err := d.Detect(context.TODO()) + + assert.Nil(t, err) + assert.Equal(t, 0, res.Attributes().Len()) +} + +func Test_ecsPrefersLatestTmde(t *testing.T) { + os.Clearenv() + os.Setenv(tmde3EnvVar, "3") + os.Setenv(tmde4EnvVar, "4") + + tmde := getTmdeFromEnv() + + assert.Equal(t, "4", tmde) +} + +func Test_ecsFiltersInvalidContainers(t *testing.T) { + // Should ignore empty container + c1 := Container{} + + // Should ignore non-normal container + c2 := createTestContainer(true) + c2.Type = "INTERNAL" + + // Should ignore stopped containers + c3 := createTestContainer(true) + c3.KnownStatus = "STOPPED" + + // Should ignore its own container + c4 := createTestContainer(true) + + containers := []Container{c1, c2, c3, c4} + + ld := getValidLogData(containers, &c4, "123") + + for _, attrib := range ld { + assert.Equal(t, 0, attrib.Len()) + } +} + +func Test_ecsDetectV4(t *testing.T) { + os.Clearenv() + os.Setenv(tmde4EnvVar, "endpoint") + + want := pdata.NewResource() + want.InitEmpty() + attr := want.Attributes() + attr.InsertString("cloud.provider", "aws") + attr.InsertString("cloud.infrastructure_service", "ECS") + attr.InsertString("aws.ecs.cluster", "cluster") + attr.InsertString("aws.ecs.task.arn", "arn:aws:ecs:us-west-2:123456789123:task/123") + attr.InsertString("aws.ecs.task.family", "family") + attr.InsertString("cloud.region", "us-west-2") + attr.InsertString("cloud.zone", "us-west-2a") + attr.InsertString("cloud.account.id", "123456789123") + attr.InsertString("aws.ecs.launchtype", "EC2") + + attribFields := []string{"aws.log.group.names", "aws.log.group.arns", "aws.log.stream.names", "aws.log.stream.arns"} + attribVals := []string{"group", "arn:aws:logs:us-east-1:123456789123:log-group:group", "stream", "arn:aws:logs:us-east-1:123456789123:log-group:group:log-stream:stream"} + + for i, field := range attribFields { + av := pdata.NewAnyValueArray() + av.Append(pdata.NewAttributeValueString(attribVals[i])) + ava := pdata.NewAttributeValueArray() + ava.SetArrayVal(av) + attr.Insert(field, ava) + } + + d := Detector{provider: &mockMetaDataProvider{isV4: true}} + got, err := d.Detect(context.TODO()) + + assert.Nil(t, err) + assert.NotNil(t, got) + assert.Equal(t, internal.AttributesToMap(want.Attributes()), internal.AttributesToMap(got.Attributes())) +} + +func Test_ecsDetectV3(t *testing.T) { + os.Clearenv() + os.Setenv(tmde3EnvVar, "endpoint") + + want := pdata.NewResource() + want.InitEmpty() + attr := want.Attributes() + attr.InsertString("cloud.provider", "aws") + attr.InsertString("cloud.infrastructure_service", "ECS") + attr.InsertString("aws.ecs.cluster", "cluster") + attr.InsertString("aws.ecs.task.arn", "arn:aws:ecs:us-west-2:123456789123:task/123") + attr.InsertString("aws.ecs.task.family", "family") + attr.InsertString("cloud.region", "us-west-2") + attr.InsertString("cloud.zone", "us-west-2a") + attr.InsertString("cloud.account.id", "123456789123") + + d := Detector{provider: &mockMetaDataProvider{isV4: false}} + got, err := d.Detect(context.TODO()) + + assert.Nil(t, err) + assert.NotNil(t, got) + assert.Equal(t, internal.AttributesToMap(want.Attributes()), internal.AttributesToMap(got.Attributes())) +} + +func createTestContainer(isV4 bool) Container { + c := Container{ + DockerID: "123", + Type: "NORMAL", + KnownStatus: "RUNNING", + } + + if isV4 { + c.LogDriver = "awslogs" + c.ContainerARN = "arn:aws:ecs" + c.LogOptions = LogData{LogGroup: "group", Region: "us-east-1", Stream: "stream"} + } + + return c +} diff --git a/processor/resourcedetectionprocessor/internal/aws/ecs/metadata.go b/processor/resourcedetectionprocessor/internal/aws/ecs/metadata.go new file mode 100644 index 000000000000..3ad306605b12 --- /dev/null +++ b/processor/resourcedetectionprocessor/internal/aws/ecs/metadata.go @@ -0,0 +1,26 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ecs + +import "net/http" + +type HTTPClient interface { + Do(req *http.Request) (*http.Response, error) +} + +type ecsMetadataProvider interface { + fetchTaskMetaData(tmde string) (*TaskMetaData, error) + fetchContainerMetaData(tmde string) (*Container, error) +} diff --git a/processor/resourcedetectionprocessor/internal/aws/ecs/metadata_ecs.go b/processor/resourcedetectionprocessor/internal/aws/ecs/metadata_ecs.go new file mode 100644 index 000000000000..b961a3c5051b --- /dev/null +++ b/processor/resourcedetectionprocessor/internal/aws/ecs/metadata_ecs.go @@ -0,0 +1,103 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ecs + +import ( + "encoding/json" + "log" + "net/http" +) + +type TaskMetaData struct { + Cluster string + LaunchType string // TODO: Change to enum when defined in otel collector convent + TaskARN string + Family string + AvailabilityZone string + Containers []Container +} + +type Container struct { + DockerID string `json:"DockerId"` + ContainerARN string + Type string + KnownStatus string + LogDriver string + LogOptions LogData +} + +type LogData struct { + LogGroup string `json:"awslogs-group"` + Region string `json:"awslogs-region"` + Stream string `json:"awslogs-stream"` +} + +type ecsMetadataProviderImpl struct { + client HTTPClient +} + +var _ ecsMetadataProvider = &ecsMetadataProviderImpl{} + +// Retrieves the metadata for a task running on Amazon ECS +func (md *ecsMetadataProviderImpl) fetchTaskMetaData(tmde string) (*TaskMetaData, error) { + ret, err := fetch(tmde+"/task", md, true) + if ret == nil { + return nil, err + } + + return ret.(*TaskMetaData), err +} + +// Retrieves the metadata for the Amazon ECS Container the collector is running on +func (md *ecsMetadataProviderImpl) fetchContainerMetaData(tmde string) (*Container, error) { + ret, err := fetch(tmde, md, false) + if ret == nil { + return nil, err + } + + return ret.(*Container), err +} + +func fetch(tmde string, md *ecsMetadataProviderImpl, task bool) (tmdeResp interface{}, err error) { + req, err := http.NewRequest(http.MethodGet, tmde, nil) + + if err != nil { + log.Printf("Received error constructing request to ECS Task Metadata Endpoint: %v", err) + return nil, err + } + + resp, err := md.client.Do(req) + + if err != nil { + log.Printf("Received error from ECS Task Metadata Endpoint: %v", err) + return nil, err + } + + if task { + tmdeResp = &TaskMetaData{} + } else { + tmdeResp = &Container{} + } + + err = json.NewDecoder(resp.Body).Decode(tmdeResp) + defer resp.Body.Close() + + if err != nil { + log.Printf("Encountered unexpected error reading response from ECS Task Metadata Endpoint: %v", err) + return nil, err + } + + return tmdeResp, nil +} diff --git a/processor/resourcedetectionprocessor/internal/aws/ecs/metadata_ecs_test.go b/processor/resourcedetectionprocessor/internal/aws/ecs/metadata_ecs_test.go new file mode 100644 index 000000000000..55435dbd4be2 --- /dev/null +++ b/processor/resourcedetectionprocessor/internal/aws/ecs/metadata_ecs_test.go @@ -0,0 +1,101 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ecs + +import ( + "bytes" + "errors" + "io/ioutil" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" +) + +const ( + taskMeta = `{"Cluster":"myCluster", + "TaskARN":"arn:aws:ecs:ap-southeast-1:123456789123:task/123", + "Family":"myFamily", + "LaunchType":"ec2", + "AvailabilityZone":"ap-southeast-1a", + "Containers": [] + }` + + containerMeta = `{ + "DockerId":"abcdef12345", + "Type":"NORMAL", + "KnownStatus":"RUNNING", + "LogDriver":"awslogs", + "LogOptions": { + "awslogs-group":"helloworld", + "awslogs-region":"ap-southeast-1", + "awslogs-stream":"logs/main/456" + }, + "ContainerARN":"arn:aws:ecs:ap-southeast-1:123456789123:container/123" + }` +) + +type mockClient struct { + response string + retErr bool +} + +func (mc *mockClient) Do(req *http.Request) (*http.Response, error) { + if mc.retErr { + return nil, errors.New("fake error") + } + + r := ioutil.NopCloser(bytes.NewReader([]byte(mc.response))) + return &http.Response{ + StatusCode: 200, + Body: r, + }, nil +} + +func Test_ecsMetadata_fetchTask(t *testing.T) { + md := ecsMetadataProviderImpl{client: &mockClient{response: taskMeta, retErr: false}} + fetchResp, err := md.fetchTaskMetaData("url") + + assert.Nil(t, err) + assert.Equal(t, "myCluster", fetchResp.Cluster) + assert.Equal(t, "arn:aws:ecs:ap-southeast-1:123456789123:task/123", fetchResp.TaskARN) + assert.Equal(t, "myFamily", fetchResp.Family) + assert.Equal(t, "ec2", fetchResp.LaunchType) + assert.Equal(t, "ap-southeast-1a", fetchResp.AvailabilityZone) + assert.Empty(t, fetchResp.Containers) +} + +func Test_ecsMetadata_fetchContainer(t *testing.T) { + md := ecsMetadataProviderImpl{client: &mockClient{response: containerMeta, retErr: false}} + fetchResp, err := md.fetchContainerMetaData("url") + + assert.Nil(t, err) + assert.NotNil(t, fetchResp) + assert.Equal(t, "abcdef12345", fetchResp.DockerID) + assert.Equal(t, "arn:aws:ecs:ap-southeast-1:123456789123:container/123", fetchResp.ContainerARN) + assert.Equal(t, "RUNNING", fetchResp.KnownStatus) + assert.Equal(t, "awslogs", fetchResp.LogDriver) + assert.Equal(t, "helloworld", fetchResp.LogOptions.LogGroup) + assert.Equal(t, "ap-southeast-1", fetchResp.LogOptions.Region) + assert.Equal(t, "logs/main/456", fetchResp.LogOptions.Stream) +} + +func Test_ecsMetadata_returnsError(t *testing.T) { + md := ecsMetadataProviderImpl{client: &mockClient{response: "{}", retErr: true}} + fetchResp, err := md.fetchContainerMetaData("url") + + assert.Nil(t, fetchResp) + assert.NotNil(t, err) +} diff --git a/processor/resourcedetectionprocessor/internal/resourcedetection.go b/processor/resourcedetectionprocessor/internal/resourcedetection.go index 10a74d73b133..bba5e5a270a9 100644 --- a/processor/resourcedetectionprocessor/internal/resourcedetection.go +++ b/processor/resourcedetectionprocessor/internal/resourcedetection.go @@ -130,22 +130,39 @@ func (p *ResourceProvider) detectResource(ctx context.Context) { func AttributesToMap(am pdata.AttributeMap) map[string]interface{} { mp := make(map[string]interface{}, am.Len()) am.ForEach(func(k string, v pdata.AttributeValue) { - switch v.Type() { - case pdata.AttributeValueBOOL: - mp[k] = v.BoolVal() - case pdata.AttributeValueINT: - mp[k] = v.IntVal() - case pdata.AttributeValueDOUBLE: - mp[k] = v.DoubleVal() - case pdata.AttributeValueSTRING: - mp[k] = v.StringVal() - case pdata.AttributeValueMAP: - mp[k] = AttributesToMap(v.MapVal()) - } + mp[k] = UnwrapAttribute(v) }) return mp } +func UnwrapAttribute(v pdata.AttributeValue) interface{} { + switch v.Type() { + case pdata.AttributeValueBOOL: + return v.BoolVal() + case pdata.AttributeValueINT: + return v.IntVal() + case pdata.AttributeValueDOUBLE: + return v.DoubleVal() + case pdata.AttributeValueSTRING: + return v.StringVal() + case pdata.AttributeValueARRAY: + return getSerializableArray(v.ArrayVal()) + case pdata.AttributeValueMAP: + return AttributesToMap(v.MapVal()) + default: + return nil + } +} + +func getSerializableArray(inArr pdata.AnyValueArray) []interface{} { + var outArr []interface{} + for i := 0; i < inArr.Len(); i++ { + outArr = append(outArr, UnwrapAttribute(inArr.At(i))) + } + + return outArr +} + func MergeResource(to, from pdata.Resource, overrideTo bool) { if IsEmptyResource(from) { return diff --git a/processor/resourcedetectionprocessor/internal/resourcedetection_test.go b/processor/resourcedetectionprocessor/internal/resourcedetection_test.go index e0df877026cd..89100a165963 100644 --- a/processor/resourcedetectionprocessor/internal/resourcedetection_test.go +++ b/processor/resourcedetectionprocessor/internal/resourcedetection_test.go @@ -228,6 +228,10 @@ func TestAttributesToMap(t *testing.T) { "map": map[string]interface{}{ "inner": "val", }, + "array": []interface{}{ + "inner", + int64(42), + }, } attr := pdata.NewAttributeMap() attr.InsertString("str", "a") @@ -239,6 +243,12 @@ func TestAttributesToMap(t *testing.T) { innerAttr.InsertString("inner", "val") avm.SetMapVal(innerAttr) attr.Insert("map", avm) + ava := pdata.NewAttributeValueArray() + arrayAttr := pdata.NewAnyValueArray() + arrayAttr.Append(pdata.NewAttributeValueString("inner")) + arrayAttr.Append(pdata.NewAttributeValueInt(42)) + ava.SetArrayVal(arrayAttr) + attr.Insert("array", ava) assert.Equal(t, m, AttributesToMap(attr)) } diff --git a/processor/resourcedetectionprocessor/testdata/config.yaml b/processor/resourcedetectionprocessor/testdata/config.yaml index 84a11af8d22c..b30ec235c825 100644 --- a/processor/resourcedetectionprocessor/testdata/config.yaml +++ b/processor/resourcedetectionprocessor/testdata/config.yaml @@ -11,6 +11,10 @@ processors: detectors: [env, ec2] timeout: 2s override: false + resourcedetection/ecs: + detectors: [env, ecs] + timeout: 2s + override: false exporters: exampleexporter: @@ -20,7 +24,8 @@ service: metrics: receivers: [examplereceiver] processors: - # Choose one depending on your cloud provider: + # Choose one depending on your cloud provider and environment: # - resourcedetection/gce # - resourcedetection/ec2 + # - resourcedetection/ecs exporters: [exampleexporter]