diff --git a/CHANGELOG.md b/CHANGELOG.md index 51af7faf9a490..194c64ef23d97 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +#### New Outputs + +- [opentelemetry](/plugins/outputs/opentelemetry/README.md) - Contributed by @codeboten ## v1.18.2 [2021-04-28] #### Bugfixes diff --git a/README.md b/README.md index b2d8e6a548d3c..83b9483326e13 100644 --- a/README.md +++ b/README.md @@ -444,6 +444,7 @@ For documentation on the latest development code see the [documentation index][d * [nats](./plugins/outputs/nats) * [newrelic](./plugins/outputs/newrelic) * [nsq](./plugins/outputs/nsq) +* [opentelemetry](./plugins/outputs/opentelemetry) * [opentsdb](./plugins/outputs/opentsdb) * [prometheus](./plugins/outputs/prometheus_client) * [riemann](./plugins/outputs/riemann) diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 5160db820730f..ee9e7181af8c4 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -1254,6 +1254,34 @@ # separator = "_" +# # Configuration for OpenTelemetry to send metrics to +# [[outputs.opentelemetry]] +# ## OpenTelemetry endpoint +# endpoint = "http://localhost:4317" +# +# ## Timeout when sending data over grpc +# timeout = "10s" +# +# ## Compression used to send data, supports: "gzip", "none" +# compression = "gzip" +# +# ## Optional TLS Config for use on gRPC connections. +# tls_ca = "/etc/telegraf/ca.pem" +# tls_cert = "/etc/telegraf/cert.pem" +# tls_key = "/etc/telegraf/key.pem" +# ## Use TLS but skip chain & host verification +# insecure_skip_verify = false +# +# # Additional resource attributes +# [outputs.opentelemetry.attributes] +# "service.name" = "demo" +# +# # Additional grpc metadata +# [outputs.opentelemetry.headers] +# key1 = "value1" +# + + # # Configuration for the Prometheus client to spawn # [[outputs.prometheus_client]] # ## Address to listen on diff --git a/go.mod b/go.mod index e4bb9710b7dd5..f109d045a6192 100644 --- a/go.mod +++ b/go.mod @@ -66,7 +66,6 @@ require ( github.com/gopcua/opcua v0.1.13 github.com/gorilla/mux v1.7.3 github.com/gosnmp/gosnmp v1.31.0 - github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect github.com/harlow/kinesis-consumer v0.3.1-0.20181230152818-2f58b136fee0 github.com/hashicorp/consul/api v1.6.0 @@ -139,6 +138,7 @@ require ( google.golang.org/api v0.29.0 google.golang.org/genproto v0.0.0-20200815001618-f69a88009b70 google.golang.org/grpc v1.37.0 + google.golang.org/protobuf v1.26.0 gopkg.in/djherbis/times.v1 v1.2.0 gopkg.in/fatih/pool.v2 v2.0.0 // indirect gopkg.in/gorethink/gorethink.v3 v3.0.5 diff --git a/go.sum b/go.sum index 0b4946b557c49..591b5167a2d94 100644 --- a/go.sum +++ b/go.sum @@ -112,8 +112,6 @@ github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb0 github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= -github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= -github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/purell v1.1.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= @@ -175,8 +173,6 @@ github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQ github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.34.9/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= github.com/aws/aws-sdk-go v1.34.34 h1:5dC0ZU0xy25+UavGNEkQ/5MOQwxXDA2YXtjCL1HfYKI= -github.com/aws/aws-sdk-go v1.34.34 h1:5dC0ZU0xy25+UavGNEkQ/5MOQwxXDA2YXtjCL1HfYKI= -github.com/aws/aws-sdk-go v1.34.34/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48= github.com/aws/aws-sdk-go v1.34.34/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= github.com/aws/aws-sdk-go-v2 v1.1.0 h1:sKP6QWxdN1oRYjl+k6S3bpgBI+XUx/0mqVOLIw4lR/Q= @@ -376,8 +372,6 @@ github.com/go-openapi/errors v0.19.2/go.mod h1:qX0BLWsyaKfvhluLejVpVNwNRdXZhEbTA github.com/go-openapi/errors v0.19.3/go.mod h1:qX0BLWsyaKfvhluLejVpVNwNRdXZhEbTA4kxxpKBC94= github.com/go-openapi/errors v0.19.4/go.mod h1:qX0BLWsyaKfvhluLejVpVNwNRdXZhEbTA4kxxpKBC94= github.com/go-openapi/jsonpointer v0.0.0-20160704185906-46af16f9f7b1/go.mod h1:+35s3my2LFTysnkMfxsJBAMHj/DoqoB9knIWoYG/Vk0= -github.com/go-openapi/jsonpointer v0.0.0-20160704185906-46af16f9f7b1/go.mod h1:+35s3my2LFTysnkMfxsJBAMHj/DoqoB9knIWoYG/Vk0= -github.com/go-openapi/jsonpointer v0.0.0-20160704185906-46af16f9f7b1/go.mod h1:+35s3my2LFTysnkMfxsJBAMHj/DoqoB9knIWoYG/Vk0= github.com/go-openapi/jsonpointer v0.17.0/go.mod h1:cOnomiV+CVVwFLk0A/MExoFMjwdsUdVpsRhURCKh+3M= github.com/go-openapi/jsonpointer v0.18.0/go.mod h1:cOnomiV+CVVwFLk0A/MExoFMjwdsUdVpsRhURCKh+3M= github.com/go-openapi/jsonpointer v0.19.2/go.mod h1:3akKfEdA7DF1sugOqz1dVQHBcuDBPKZGEoHC/NkiQRg= @@ -385,8 +379,6 @@ github.com/go-openapi/jsonpointer v0.19.2/go.mod h1:3akKfEdA7DF1sugOqz1dVQHBcuDB github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= github.com/go-openapi/jsonreference v0.0.0-20160704190145-13c6e3589ad9/go.mod h1:W3Z9FmVs9qj+KR4zFKmDPGiLdk1D9Rlm7cyMvf57TTg= -github.com/go-openapi/jsonreference v0.0.0-20160704190145-13c6e3589ad9/go.mod h1:W3Z9FmVs9qj+KR4zFKmDPGiLdk1D9Rlm7cyMvf57TTg= -github.com/go-openapi/jsonreference v0.0.0-20160704190145-13c6e3589ad9/go.mod h1:W3Z9FmVs9qj+KR4zFKmDPGiLdk1D9Rlm7cyMvf57TTg= github.com/go-openapi/jsonreference v0.17.0/go.mod h1:g4xxGn04lDIRh0GJb5QlpE3HfopLOL6uZrK/VgnsK9I= github.com/go-openapi/jsonreference v0.18.0/go.mod h1:g4xxGn04lDIRh0GJb5QlpE3HfopLOL6uZrK/VgnsK9I= github.com/go-openapi/jsonreference v0.19.2/go.mod h1:jMjeRr2HHw6nAVajTXJ4eiUwohSTlpa0o73RUL1owJc= @@ -405,8 +397,6 @@ github.com/go-openapi/runtime v0.19.0/go.mod h1:OwNfisksmmaZse4+gpV3Ne9AyMOlP1lt github.com/go-openapi/runtime v0.19.4/go.mod h1:X277bwSUBxVlCYR3r7xgZZGKVvBd/29gLDlFGtJ8NL4= github.com/go-openapi/runtime v0.19.15/go.mod h1:dhGWCTKRXlAfGnQG0ONViOZpjfg0m2gUt9nTQPQZuoo= github.com/go-openapi/spec v0.0.0-20160808142527-6aced65f8501/go.mod h1:J8+jY1nAiCcj+friV/PDoE1/3eeccG9LYBs0tYvLOWc= -github.com/go-openapi/spec v0.0.0-20160808142527-6aced65f8501/go.mod h1:J8+jY1nAiCcj+friV/PDoE1/3eeccG9LYBs0tYvLOWc= -github.com/go-openapi/spec v0.0.0-20160808142527-6aced65f8501/go.mod h1:J8+jY1nAiCcj+friV/PDoE1/3eeccG9LYBs0tYvLOWc= github.com/go-openapi/spec v0.17.0/go.mod h1:XkF/MOi14NmjsfZ8VtAKf8pIlbZzyoTvZsdfssdxcBI= github.com/go-openapi/spec v0.18.0/go.mod h1:XkF/MOi14NmjsfZ8VtAKf8pIlbZzyoTvZsdfssdxcBI= github.com/go-openapi/spec v0.19.2/go.mod h1:sCxk3jxKgioEJikev4fgkNmwS+3kuYdJtcsZsD5zxMY= @@ -598,9 +588,8 @@ github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:Fecb github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= +github.com/grpc-ecosystem/grpc-gateway v1.14.6 h1:8ERzHx8aj1Sc47mu9n/AksaKCSWrMchFtkdrS4BIj5o= github.com/grpc-ecosystem/grpc-gateway v1.14.6/go.mod h1:zdiPV4Yse/1gnckTHtghG4GkDEdKCRJduHpTxT3/jcw= -github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo= -github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= github.com/harlow/kinesis-consumer v0.3.1-0.20181230152818-2f58b136fee0 h1:U0KvGD9CJIl1nbgu9yLsfWxMT6WqL8fG0IBB7RvOZZQ= @@ -1428,6 +1417,7 @@ golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgw golang.org/x/tools v0.0.0-20190614205625-5aca471b1d59/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190617190820-da514acc4774/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190624222133-a101b041ded4/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= @@ -1550,7 +1540,6 @@ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8 google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60= google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= -google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= google.golang.org/grpc v1.37.0 h1:uSZWeQJX5j11bIQ4AJoj+McDBo29cY1MCoC1wO3ts+c= google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 61270d5ad412e..f1992194c98cb 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -33,6 +33,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/nats" _ "github.com/influxdata/telegraf/plugins/outputs/newrelic" _ "github.com/influxdata/telegraf/plugins/outputs/nsq" + _ "github.com/influxdata/telegraf/plugins/outputs/opentelemetry" _ "github.com/influxdata/telegraf/plugins/outputs/opentsdb" _ "github.com/influxdata/telegraf/plugins/outputs/prometheus_client" _ "github.com/influxdata/telegraf/plugins/outputs/riemann" diff --git a/plugins/outputs/opentelemetry/README.md b/plugins/outputs/opentelemetry/README.md new file mode 100644 index 0000000000000..3295b645139f8 --- /dev/null +++ b/plugins/outputs/opentelemetry/README.md @@ -0,0 +1,38 @@ +# OpenTelemetry Output Plugin + +This plugin writes to any backend that support the [OpenTelemetry Protocol (OTLP)](https://github.com/open-telemetry/opentelemetry-specification/tree/main/specification/protocol). Metrics are named by combining the metric name and field key - eg: `cpu.usage_user`. Additional [resource attributes](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/sdk.md#resource-sdk) can be configured by setting `attributes` in your configuration. If the endpoint you're connecting to requires additional gRPC headers, this data can be configured via the `headers` option. + +### Configuration + +```toml +[[outputs.opentelemetry]] + ## OpenTelemetry endpoint + # endpoint = "http://localhost:4317" + + ## Timeout when sending data over grpc + # timeout = "10s" + + ## Compression used to send data, supports: "gzip", "none" + # compression = "gzip" + + ## Optional TLS Config for use on gRPC connections. + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + # Additional resource attributes + [outputs.opentelemetry.attributes] + "service.name" = "demo" + + # Additional grpc metadata + [outputs.opentelemetry.headers] + key1 = "value1" + +``` + +### Restrictions + +* OpenTelemetry protocol does not support string values in custom metrics, any string fields will be omitted and not written to the endpoint. +* The plugin implements the protocol using protobufs over gRPC, so the backend must support this protocol. \ No newline at end of file diff --git a/plugins/outputs/opentelemetry/client.go b/plugins/outputs/opentelemetry/client.go new file mode 100644 index 0000000000000..7b0362075c782 --- /dev/null +++ b/plugins/outputs/opentelemetry/client.go @@ -0,0 +1,222 @@ +package opentelemetry + +import ( + "context" + "crypto/tls" + "errors" + "fmt" + "net" + "net/url" + "sync" + "time" + + metricsService "github.com/influxdata/influxdb-observability/otlp/collector/metrics/v1" + metricspb "github.com/influxdata/influxdb-observability/otlp/metrics/v1" + "github.com/influxdata/telegraf" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" +) + +const ( + // serviceConfig copied from OTel-Go + // https://github.com/open-telemetry/opentelemetry-go/blob/a2cecb6e80f6a0712187b080a97f8efb5a61082a/exporters/otlp/internal/otlpconfig/options.go#L47. + serviceConfig = `{ + "methodConfig":[{ + "name":[ + { "service":"opentelemetry.proto.collector.metrics.v1.MetricsService" }, + { "service":"opentelemetry.proto.collector.trace.v1.TraceService" } + ], + "retryPolicy":{ + "MaxAttempts":5, + "InitialBackoff":"0.3s", + "MaxBackoff":"5s", + "BackoffMultiplier":2, + "RetryableStatusCodes":[ + "CANCELLED", + "DEADLINE_EXCEEDED", + "RESOURCE_EXHAUSTED", + "ABORTED", + "OUT_OF_RANGE", + "UNAVAILABLE", + "DATA_LOSS" + ] + } + }] +}` +) + +const maxTimeseriesPerRequest = 500 + +// client allows reading and writing from/to a remote gRPC endpoint. The +// implementation may hit a single backend, so the application should create a +// number of these clients. +type client struct { + logger telegraf.Logger + url *url.URL + timeout time.Duration + tlsConfig *tls.Config + headers metadata.MD + compressor string + + conn *grpc.ClientConn +} + +// connect will dial a new connection if one is not set. When +// dialing, this function uses its a new context and the same timeout +// used for store(). +func (c *client) connect(ctx context.Context) error { + if c.conn != nil { + return nil + } + + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + + c.logger.Debugf("new OpenTelemetry connection, url=%s timeout=%s", c.url.String(), c.timeout) + + dopts := []grpc.DialOption{ + grpc.WithBlock(), // Wait for the connection to be established before using it. + grpc.WithDefaultServiceConfig(serviceConfig), + } + if c.url.Scheme != "http" { + dopts = append(dopts, grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConfig))) + } else { + dopts = append(dopts, grpc.WithInsecure()) + } + if c.compressor != "" && c.compressor != "none" { + dopts = append(dopts, grpc.WithDefaultCallOptions(grpc.UseCompressor(c.compressor))) + } + address := c.url.Hostname() + if len(c.url.Port()) > 0 { + address = net.JoinHostPort(address, c.url.Port()) + } + + var conn *grpc.ClientConn + + for { + var err error + conn, err = grpc.DialContext(ctx, address, dopts...) + if err != nil { + return err + } + + if err == nil { + service := metricsService.NewMetricsServiceClient(conn) + empty := &metricsService.ExportMetricsServiceRequest{} + + _, err = service.Export(metadata.NewOutgoingContext(ctx, c.headers), empty) + if err == nil { + break + } + } + + select { + case <-ctx.Done(): + return ctx.Err() + default: + if isRecoverable(err) { + c.logger.Infof("ping recoverable error, still trying, err=%w", err) + continue + } + } + return fmt.Errorf("non-recoverable failure in ping, err=%w", err) + } + c.conn = conn + return nil +} + +// store sends a batch of samples to the endpoint. +func (c *client) store(samples []*metricspb.ResourceMetrics) error { + metricsLen := len(samples) + if metricsLen == 0 { + // Nothing to do, return silently. + return nil + } + req := metricsService.ExportMetricsServiceRequest{ + ResourceMetrics: samples, + } + + // Note the call to connect() applies its own timeout for Dial(). + ctx := context.Background() + err := c.connect(ctx) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + + service := metricsService.NewMetricsServiceClient(c.conn) + + errs := make(chan error, metricsLen/maxTimeseriesPerRequest+1) + var wg sync.WaitGroup + for i := 0; i < metricsLen; i += maxTimeseriesPerRequest { + end := i + maxTimeseriesPerRequest + if end > metricsLen { + end = metricsLen + } + wg.Add(1) + go func(begin int, end int) { + defer wg.Done() + reqCopy := &metricsService.ExportMetricsServiceRequest{ + ResourceMetrics: req.ResourceMetrics[begin:end], + } + + var md metadata.MD + var err error + + if _, err = service.Export(metadata.NewOutgoingContext(ctx, c.headers), reqCopy, grpc.Trailer(&md)); err != nil { + c.logger.Errorf("export failure, err=%w size=%d trailers=%v recoverable=%t", + err, + proto.Size(reqCopy), + md, + isRecoverable(err), + ) + errs <- err + return + } + + c.logger.Debug("successful write, records=%d size=%d trailers=%v", end-begin, proto.Size(reqCopy), md) + }(i, end) + } + wg.Wait() + close(errs) + if err, ok := <-errs; ok { + return err + } + return nil +} + +func (c *client) close() error { + if c.conn == nil { + return nil + } + return c.conn.Close() +} + +func isRecoverable(err error) bool { + if errors.Is(err, context.Canceled) { + return true + } + + if errors.Is(err, context.DeadlineExceeded) { + return true + } + + s, ok := status.FromError(err) + if !ok { + return false + } + switch s.Code() { + case codes.DeadlineExceeded, codes.Canceled, codes.ResourceExhausted, + codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss: + // See https://github.com/open-telemetry/opentelemetry-specification/ + // blob/master/specification/protocol/otlp.md#response + return true + } + return false +} diff --git a/plugins/outputs/opentelemetry/client_test.go b/plugins/outputs/opentelemetry/client_test.go new file mode 100644 index 0000000000000..be7c394eb50b5 --- /dev/null +++ b/plugins/outputs/opentelemetry/client_test.go @@ -0,0 +1,85 @@ +package opentelemetry + +import ( + "context" + "net/url" + "testing" + "time" + + metricsService "github.com/influxdata/influxdb-observability/otlp/collector/metrics/v1" + metricspb "github.com/influxdata/influxdb-observability/otlp/metrics/v1" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" + "golang.org/x/net/nettest" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func TestClientWithRecoverableError(t *testing.T) { + listener, err := nettest.NewLocalListener("tcp") + require.NoError(t, err) + grpcServer := grpc.NewServer() + mockMetricsServer := metricServiceServer{} + metricsService.RegisterMetricsServiceServer(grpcServer, &mockMetricsServer) + go func() { + err := grpcServer.Serve(listener) + require.NoError(t, err) + }() + defer grpcServer.Stop() + + u, err := url.Parse("https://" + listener.Addr().String()) + require.NoError(t, err) + + client := client{ + logger: testutil.Logger{}, + url: u, + timeout: time.Second, + } + err = client.connect(context.Background()) + require.True(t, isRecoverable(err), "expected recoverableError in error %v", err) +} + +func TestClientWithUnrecoverableError(t *testing.T) { + listener, err := nettest.NewLocalListener("tcp") + require.NoError(t, err) + grpcServer := grpc.NewServer() + mockMetricsServer := metricServiceServer{ + status: status.New(codes.InvalidArgument, "the request was missing some important arguments, change the arguments before retrying the request"), + } + metricsService.RegisterMetricsServiceServer(grpcServer, &mockMetricsServer) + go func() { + err := grpcServer.Serve(listener) + require.NoError(t, err) + }() + defer grpcServer.Stop() + + u, err := url.Parse("http://" + listener.Addr().String()) + require.NoError(t, err) + + client := client{ + logger: testutil.Logger{}, + url: u, + timeout: time.Second, + } + + err = client.connect(context.Background()) + require.False(t, isRecoverable(err), "expected unrecoverableError in error %v", err) + + err = client.store([]*metricspb.ResourceMetrics{{}}) + require.False(t, isRecoverable(err), "expected unrecoverableError in error %v", err) +} + +func TestEmptyRequest(t *testing.T) { + serverURL, err := url.Parse("http://localhost:12345") + require.NoError(t, err) + + c := client{ + logger: testutil.Logger{}, + url: serverURL, + timeout: time.Second, + } + + err = c.store([]*metricspb.ResourceMetrics{}) + require.NoError(t, err) +} diff --git a/plugins/outputs/opentelemetry/opentelemetry.go b/plugins/outputs/opentelemetry/opentelemetry.go new file mode 100644 index 0000000000000..5c8b8af883f6e --- /dev/null +++ b/plugins/outputs/opentelemetry/opentelemetry.go @@ -0,0 +1,236 @@ +package opentelemetry + +import ( + "context" + "fmt" + "net/url" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/common/tls" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/pkg/errors" + + metricspb "github.com/influxdata/influxdb-observability/otlp/metrics/v1" + "google.golang.org/grpc/metadata" +) + +// OpenTelemetry is the OpenTelemetry Protocol config info. +type OpenTelemetry struct { + Endpoint string `toml:"endpoint"` + Timeout config.Duration `toml:"timeout"` + Compression string `toml:"compression"` + Headers map[string]string `toml:"headers"` + Attributes map[string]string `toml:"attributes"` + Log telegraf.Logger `toml:"-"` + tls.ClientConfig + + client *client + resourceTags []*telegraf.Tag +} + +const ( + // maxInt is the max int64 value. + maxInt = int64(^uint(0) >> 1) + + defaultEndpoint = "http://localhost:4317" + defaultTimeout = time.Second * 10 + defaultCompression = "gzip" + instrumentationLibraryName = "telegraf" +) + +var sampleConfig = ` + ## OpenTelemetry endpoint + # endpoint = "http://localhost:4317" + + ## Timeout when sending data over grpc + # timeout = "10s" + + ## Compression used to send data, supports: "gzip", "none" + # compression = "gzip" + + ## Optional TLS Config for use on gRPC connections. + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + # Additional resource attributes + [outputs.opentelemetry.attributes] + "service.name" = "demo" + + # Additional grpc metadata + [outputs.opentelemetry.headers] + key1 = "value1" + +` + +func (o *OpenTelemetry) Init() error { + if o.Endpoint == "" { + o.Endpoint = defaultEndpoint + } + endpoint, err := url.Parse(o.Endpoint) + if err != nil { + return errors.Wrap(err, "invalid endpoint configured") + } + + if o.Timeout < config.Duration(time.Second) { + o.Timeout = config.Duration(defaultTimeout) + } + + if o.Compression == "" { + o.Compression = defaultCompression + } + + for k, v := range o.Attributes { + o.resourceTags = append(o.resourceTags, &telegraf.Tag{Key: k, Value: v}) + } + + if o.Headers == nil { + o.Headers = make(map[string]string, 1) + } + + o.Headers["telemetry-reporting-agent"] = fmt.Sprintf( + "%s/%s", + instrumentationLibraryName, + internal.Version(), + ) + + tlsConfig, err := o.TLSConfig() + if err != nil { + return errors.Wrap(err, "invalid tls configuration") + } + + o.client = &client{ + logger: o.Log, + url: endpoint, + timeout: time.Duration(o.Timeout), + tlsConfig: tlsConfig, + headers: metadata.New(o.Headers), + compressor: o.Compression, + } + return nil +} + +// Connect initiates the primary connection to the OpenTelemetry endpoint. +func (o *OpenTelemetry) Connect() error { + ctx := context.Background() + if err := o.client.connect(ctx); err != nil { + if closeErr := o.client.close(); closeErr != nil { + return errors.Wrap(err, closeErr.Error()) + } + return err + } + + return nil +} + +// Write the metrics to OTLP destination +func (o *OpenTelemetry) Write(metrics []telegraf.Metric) error { + batch := metrics + samples := []*metricspb.ResourceMetrics{} + currentTs := time.Now().UnixNano() + for _, m := range batch { + for _, f := range m.FieldList() { + sample, point := protoTimeseries(o.resourceTags, m, f) + labels := protoStringLabels(m.TagList()) + ts := m.Time().UnixNano() + + switch m.Type() { + case telegraf.Counter: + switch v := f.Value.(type) { + case uint64: + val := maxInt + if v <= uint64(maxInt) { + val = int64(v) + } + point.Data = &metricspb.Metric_IntSum{ + IntSum: monotonicIntegerPoint(labels, ts, currentTs, val), + } + case int64: + point.Data = &metricspb.Metric_IntSum{ + IntSum: monotonicIntegerPoint(labels, ts, currentTs, v), + } + case float64: + point.Data = &metricspb.Metric_DoubleSum{ + DoubleSum: monotonicDoublePoint(labels, ts, currentTs, v), + } + case bool: + val := int64(0) + if v { + val = 1 + } + point.Data = &metricspb.Metric_IntSum{ + IntSum: monotonicIntegerPoint(labels, ts, currentTs, val), + } + default: + o.Log.Errorf("get type failed: unsupported telegraf value type %v\n", f.Value) + continue + } + case telegraf.Gauge, telegraf.Untyped: + switch v := f.Value.(type) { + case uint64: + val := maxInt + if v <= uint64(maxInt) { + val = int64(v) + } + point.Data = &metricspb.Metric_IntGauge{ + IntGauge: intGauge(labels, ts, val), + } + case int64: + point.Data = &metricspb.Metric_IntGauge{ + IntGauge: intGauge(labels, ts, v), + } + case float64: + point.Data = &metricspb.Metric_DoubleGauge{ + DoubleGauge: doubleGauge(labels, ts, v), + } + case bool: + val := int64(0) + if v { + val = 1 + } + point.Data = &metricspb.Metric_IntGauge{ + IntGauge: intGauge(labels, ts, val), + } + default: + o.Log.Errorf("get type failed: unsupported telegraf value type %v\n", f.Value) + continue + } + default: + o.Log.Errorf("get type failed: unsupported telegraf metric kind %v\n", m.Type()) + continue + } + samples = append(samples, sample) + } + } + + if err := o.client.store(samples); err != nil { + return errors.Wrap(err, "unable to write to endpoint") + } + return nil +} + +// Close will terminate the session to the backend, returning error if an issue arises. +func (o *OpenTelemetry) Close() error { + return o.client.close() +} + +// SampleConfig returns the formatted sample configuration for the plugin. +func (o *OpenTelemetry) SampleConfig() string { + return sampleConfig +} + +// Description returns the human-readable function definition of the plugin. +func (o *OpenTelemetry) Description() string { + return "Configuration for OpenTelemetry to send metrics to" +} + +func init() { + outputs.Add("opentelemetry", func() telegraf.Output { + return &OpenTelemetry{} + }) +} diff --git a/plugins/outputs/opentelemetry/opentelemetry_test.go b/plugins/outputs/opentelemetry/opentelemetry_test.go new file mode 100644 index 0000000000000..f23b4cbdfddd9 --- /dev/null +++ b/plugins/outputs/opentelemetry/opentelemetry_test.go @@ -0,0 +1,288 @@ +package opentelemetry + +import ( + "context" + "fmt" + "net" + "strings" + "testing" + "time" + + metricsService "github.com/influxdata/influxdb-observability/otlp/collector/metrics/v1" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/common/tls" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" + "golang.org/x/net/nettest" + "google.golang.org/grpc" + "google.golang.org/grpc/status" +) + +type metricServiceServer struct { + status *status.Status + server *grpc.Server + metricsService.UnimplementedMetricsServiceServer + reqs []*metricsService.ExportMetricsServiceRequest +} + +func (s *metricServiceServer) Export(ctx context.Context, req *metricsService.ExportMetricsServiceRequest) (*metricsService.ExportMetricsServiceResponse, error) { + var emptyValue = metricsService.ExportMetricsServiceResponse{} + s.reqs = append(s.reqs, req) + + if s.status == nil { + return &emptyValue, nil + } + + return nil, s.status.Err() +} + +func (s *metricServiceServer) clear() { + s.reqs = []*metricsService.ExportMetricsServiceRequest{} +} + +func (s *metricServiceServer) stop() { + s.server.Stop() +} + +func getListenerAndServer(t *testing.T) (net.Listener, *metricServiceServer) { + listener, err := nettest.NewLocalListener("tcp") + require.NoError(t, err) + grpcServer := grpc.NewServer() + mockMetricsServer := metricServiceServer{ + server: grpcServer, + status: nil, + } + metricsService.RegisterMetricsServiceServer(grpcServer, &mockMetricsServer) + go func() { + err := grpcServer.Serve(listener) + require.NoError(t, err) + }() + return listener, &mockMetricsServer +} + +func TestConfigOptions(t *testing.T) { + listener, mockMetricsServer := getListenerAndServer(t) + defer mockMetricsServer.stop() + + o := OpenTelemetry{ + Endpoint: ":::::", + Log: testutil.Logger{}, + } + err := o.Init() + require.Error(t, err) + require.True(t, strings.HasPrefix(err.Error(), "invalid endpoint configured")) + + o = OpenTelemetry{ + Endpoint: "http://" + listener.Addr().String(), + Compression: "none", + Log: testutil.Logger{}, + ClientConfig: tls.ClientConfig{ + TLSCA: "invalid_ca", + }, + } + err = o.Init() + require.Error(t, err) + require.True(t, strings.HasPrefix(err.Error(), "invalid tls configuration")) + + o = OpenTelemetry{ + Endpoint: "http://" + listener.Addr().String(), + Compression: "none", + Log: testutil.Logger{}, + } + err = o.Init() + require.NoError(t, err) + + err = o.Connect() + require.NoError(t, err) + + require.Equal(t, defaultTimeout, time.Duration(o.Timeout)) + require.Equal(t, map[string]string{"telemetry-reporting-agent": fmt.Sprint( + "telegraf/", + internal.Version(), + )}, o.Headers) + + attributes := map[string]string{ + "service.name": "test", + "service.version": "0.0.1", + } + o = OpenTelemetry{ + Endpoint: "http://" + listener.Addr().String(), + Timeout: config.Duration(time.Second * 10), + Compression: "none", + Attributes: attributes, + Log: testutil.Logger{}, + } + err = o.Init() + require.NoError(t, err) + + err = o.Connect() + require.NoError(t, err) + + require.Equal(t, time.Second*10, time.Duration(o.Timeout)) + require.Equal(t, len(o.resourceTags), 2) + for _, tag := range o.resourceTags { + require.Equal(t, attributes[tag.Key], tag.Value) + } +} + +func TestWrite(t *testing.T) { + listener, mockMetricsServer := getListenerAndServer(t) + defer mockMetricsServer.stop() + o := OpenTelemetry{ + Endpoint: "http://" + listener.Addr().String(), + Timeout: config.Duration(time.Second * 10), + Compression: "none", + Log: testutil.Logger{}, + } + err := o.Init() + require.NoError(t, err) + + err = o.Connect() + require.NoError(t, err) + + mockMetricsServer.clear() + err = o.Write(testutil.MockMetrics()) + require.NoError(t, err) + + require.Equal(t, 1, len(mockMetricsServer.reqs)) + request := mockMetricsServer.reqs[0] + + require.Equal(t, 1, len(request.ResourceMetrics[0].GetInstrumentationLibraryMetrics())) + require.Equal(t, instrumentationLibraryName, request.ResourceMetrics[0].GetInstrumentationLibraryMetrics()[0].GetInstrumentationLibrary().GetName()) +} + +func TestWriteSupportedMetricKinds(t *testing.T) { + listener, mockMetricsServer := getListenerAndServer(t) + defer mockMetricsServer.stop() + // Metrics in descending order of timestamp + metrics := []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "foo": "bar", + }, + map[string]interface{}{ + "value": 42, + }, + time.Unix(2, 0), + ), + testutil.MustMetric("cpu", + map[string]string{ + "foo": "foo", + }, + map[string]interface{}{ + "value": 43, + }, + time.Unix(3, 0), + ), + testutil.MustMetric("ram", + map[string]string{ + "foo": "bar", + }, + map[string]interface{}{ + "value": 42.1, + }, + time.Unix(4, 0), + ), + testutil.MustMetric("up", + map[string]string{ + "foo": "bar", + }, + map[string]interface{}{ + "value": false, + }, + time.Unix(4, 0), + ), + testutil.MustMetric("processes", + map[string]string{ + "foo": "foo", + }, + map[string]interface{}{ + "value": 43, + }, + time.Unix(3, 0), + ), + testutil.MustMetric("disk", + map[string]string{ + "foo": "bar", + }, + map[string]interface{}{ + "value": 43.9, + }, + time.Unix(1, 0), + ), + } + o := OpenTelemetry{ + Endpoint: "http://" + listener.Addr().String(), + Timeout: config.Duration(time.Second * 10), + Compression: "none", + Log: testutil.Logger{}, + } + err := o.Init() + require.NoError(t, err) + + err = o.Connect() + require.NoError(t, err) + + mockMetricsServer.clear() + err = o.Write(metrics) + require.NoError(t, err) + + require.Equal(t, 1, len(mockMetricsServer.reqs)) + require.Equal(t, len(metrics), len(mockMetricsServer.reqs[0].GetResourceMetrics())) +} + +func TestWriteIgnoresInvalidKinds(t *testing.T) { + listener, mockMetricsServer := getListenerAndServer(t) + defer mockMetricsServer.stop() + // Metrics in descending order of timestamp + metrics := []telegraf.Metric{ + testutil.MustMetric("custom_string_metric", + map[string]string{ + "foo": "bar", + }, + map[string]interface{}{ + "value": "string value", + }, + time.Unix(2, 0), + ), + testutil.MustMetric("histogram", + map[string]string{ + "foo": "bar", + }, + map[string]interface{}{ + "value": 1, + }, + time.Unix(2, 0), + telegraf.Histogram, + ), + testutil.MustMetric("summary", + map[string]string{ + "foo": "bar", + }, + map[string]interface{}{ + "value": 1, + }, + time.Unix(2, 0), + telegraf.Summary, + ), + } + o := OpenTelemetry{ + Endpoint: "http://" + listener.Addr().String(), + Timeout: config.Duration(time.Second * 10), + Compression: "none", + Log: testutil.Logger{}, + } + err := o.Init() + require.NoError(t, err) + + err = o.Connect() + require.NoError(t, err) + + mockMetricsServer.clear() + err = o.Write(metrics) + require.NoError(t, err) + + require.Equal(t, 0, len(mockMetricsServer.reqs)) +} diff --git a/plugins/outputs/opentelemetry/transform.go b/plugins/outputs/opentelemetry/transform.go new file mode 100644 index 0000000000000..c3a59ab3a57b1 --- /dev/null +++ b/plugins/outputs/opentelemetry/transform.go @@ -0,0 +1,122 @@ +package opentelemetry + +import ( + "fmt" + "time" + + commonpb "github.com/influxdata/influxdb-observability/otlp/common/v1" + metricpb "github.com/influxdata/influxdb-observability/otlp/metrics/v1" + resourcepb "github.com/influxdata/influxdb-observability/otlp/resource/v1" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" +) + +func monotonicIntegerPoint(labels []*commonpb.StringKeyValue, start, end int64, value int64) *metricpb.IntSum { + integer := &metricpb.IntDataPoint{ + Labels: labels, + StartTimeUnixNano: uint64(start), + TimeUnixNano: uint64(end), + Value: value, + } + return &metricpb.IntSum{ + IsMonotonic: true, + AggregationTemporality: metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + DataPoints: []*metricpb.IntDataPoint{integer}, + } +} + +func monotonicDoublePoint(labels []*commonpb.StringKeyValue, start, end int64, value float64) *metricpb.DoubleSum { + double := &metricpb.DoubleDataPoint{ + Labels: labels, + StartTimeUnixNano: uint64(time.Duration(start) * time.Nanosecond), + TimeUnixNano: uint64(time.Duration(end) * time.Nanosecond), + Value: value, + } + return &metricpb.DoubleSum{ + IsMonotonic: true, + AggregationTemporality: metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + DataPoints: []*metricpb.DoubleDataPoint{double}, + } +} + +func protoLabel(tag *telegraf.Tag) *commonpb.KeyValue { + return &commonpb.KeyValue{ + Key: tag.Key, + Value: &commonpb.AnyValue{ + Value: &commonpb.AnyValue_StringValue{ + StringValue: tag.Value, + }, + }, + } +} + +func protoStringLabel(tag *telegraf.Tag) *commonpb.StringKeyValue { + return &commonpb.StringKeyValue{ + Key: tag.Key, + Value: tag.Value, + } +} + +func protoResourceAttributes(tags []*telegraf.Tag) []*commonpb.KeyValue { + ret := make([]*commonpb.KeyValue, len(tags)) + for i := range tags { + ret[i] = protoLabel(tags[i]) + } + return ret +} + +func protoStringLabels(tags []*telegraf.Tag) []*commonpb.StringKeyValue { + ret := make([]*commonpb.StringKeyValue, len(tags)) + for i := range tags { + ret[i] = protoStringLabel(tags[i]) + } + return ret +} + +func protoResource(resourceTags []*telegraf.Tag) *resourcepb.Resource { + return &resourcepb.Resource{ + Attributes: protoResourceAttributes(resourceTags), + } +} + +func protoTimeseries(resourceTags []*telegraf.Tag, m telegraf.Metric, f *telegraf.Field) (*metricpb.ResourceMetrics, *metricpb.Metric) { + metric := &metricpb.Metric{ + Name: fmt.Sprintf("%s.%s", m.Name(), f.Key), + Description: "", // TODO + Unit: "", // TODO + } + return &metricpb.ResourceMetrics{ + Resource: protoResource(resourceTags), + InstrumentationLibraryMetrics: []*metricpb.InstrumentationLibraryMetrics{ + { + InstrumentationLibrary: &commonpb.InstrumentationLibrary{ + Name: instrumentationLibraryName, + Version: internal.Version(), + }, + Metrics: []*metricpb.Metric{metric}, + }, + }, + }, metric +} + +func intGauge(labels []*commonpb.StringKeyValue, ts int64, value int64) *metricpb.IntGauge { + integer := &metricpb.IntDataPoint{ + Labels: labels, + TimeUnixNano: uint64(ts), + Value: value, + } + return &metricpb.IntGauge{ + DataPoints: []*metricpb.IntDataPoint{integer}, + } +} + +func doubleGauge(labels []*commonpb.StringKeyValue, ts int64, value float64) *metricpb.DoubleGauge { + double := &metricpb.DoubleDataPoint{ + Labels: labels, + TimeUnixNano: uint64(ts), + Value: value, + } + return &metricpb.DoubleGauge{ + DataPoints: []*metricpb.DoubleDataPoint{double}, + } +}