Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

service_http_server support OTLP log input #438

Merged
merged 4 commits into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,8 @@ your changes, such as:
- [public] [both] [updated] add a new feature

## [Unreleased]

- [public] [both] [added] service_http_server support OTLP log input.
- [public] [both] [added] add a new flusher_otlp_log plugin.
- [public] [linux] [fixed] strip binaries to reduce dist size.

2 changes: 2 additions & 0 deletions docs/cn/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
* [Syslog数据](data-pipeline/input/service-syslog.md)
* [GPU数据](data-pipeline/input/service-gpu.md)
* [eBPF网络调用数据](data-pipeline/input/metric-observer.md)
* [HTTP数据](data-pipeline/input/service-http-service.md)
* [处理](data-pipeline/processor/README.md)
* [添加字段](data-pipeline/processor/processor-add-fields.md)
* [原始数据](data-pipeline/processor/default.md)
Expand All @@ -74,6 +75,7 @@
* [Kafka](data-pipeline/flusher/kafka.md)
* [SLS](data-pipeline/flusher/sls.md)
* [标准输出/文件](data-pipeline/flusher/stdout.md)
* [OTLP日志](data-pipeline/flusher/otlp-log.md)
* [加速](data-pipeline/accelerator/README.md)
* [分隔符加速](data-pipeline/accelerator/delimiter-accelerate.md)
* [Json加速](data-pipeline/accelerator/json-accelerate.md)
Expand Down
99 changes: 99 additions & 0 deletions docs/cn/data-pipeline/input/service-http-service.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# ServiceInput示例插件

## 简介

`service_http_server` `input`插件可以接收来自unix socket、http/https、tcp的请求,并支持sls协议、otlp等多种协议。

## 配置参数

| 参数 | 类型 | 是否必选 | 说明 |
| ------- | ------ | -------- | -------------------------------------------- |
| Type | String | 是 | 插件类型,固定为`service_http_server` |
| Format | String | 否 | <p>数据格式。</p> <p>支持格式:`sls`、`prometheus`、`influxdb`、`otlp_logv1`、`statsd`</p> |
| Address | String | 否 | <p>监听地址。</p><p></p> |
| ReadTimeoutSec | String | 否 | <p>读取超时时间。</p><p>默认取值为:`10s`。</p> |
| ShutdownTimeoutSec | String | 否 | <p>关闭超时时间。</p><p>默认取值为:`5s`。</p> |
| MaxBodySize | String | 否 | <p>最大传输 body 大小。</p><p>默认取值为:`64k`。</p> |
| UnlinkUnixSock | String | 否 | <p>启动前如果监听地址为unix socket,是否进行强制释放。</p><p>默认取值为:`true`。</p> |

## 样例

### 接收 OTLP 日志

* 采集配置

```yaml
enable: true
inputs:
- Type: service_http_server
Format: "otlp_logv1"
Address: "http://127.0.0.1:12345"
flushers:
- Type: flusher_stdout
OnlyStdout: true
```

* 输入

使用 opentelemetry-java-sdk构造数据,基于[ExampleConfiguration.java](https://github.com/open-telemetry/opentelemetry-java-docs/blob/main/otlp/src/main/java/io/opentelemetry/example/otlp/ExampleConfiguration.java)、[OtlpExporterExample.java](https://github.com/open-telemetry/opentelemetry-java-docs/blob/main/otlp/src/main/java/io/opentelemetry/example/otlp/OtlpExporterExample.java)进行如下代码改造。

`ExampleConfiguration` 进行 `OTLP SDK` 初始化。
```java
static OpenTelemetrySdk initHTTPOpenTelemetry() {
// Include required service.name resource attribute on all spans and metrics
Resource resource =
Resource.getDefault()
.merge(Resource.builder().put(SERVICE_NAME, "OtlpExporterExample").build());

OpenTelemetrySdk openTelemetrySdk =
OpenTelemetrySdk.builder()
.setLogEmitterProvider(
SdkLogEmitterProvider.builder()
.setResource(resource)
.addLogProcessor(SimpleLogProcessor
.create(OtlpHttpLogExporter
.builder()
.setEndpoint("http://127.0.0.1:12345/v1/logs")
.build()))
.build())
.buildAndRegisterGlobal();

Runtime.getRuntime()
.addShutdownHook(new Thread(openTelemetrySdk.getSdkLogEmitterProvider()::shutdown));

return openTelemetrySdk;
}
```

`OtlpExporterExample` 构造数据。
```java
OpenTelemetrySdk openTelemetry = ExampleConfiguration.initHTTPOpenTelemetry();

LogEmitter logger = openTelemetry.getSdkLogEmitterProvider().get("io.opentelemetry.example");
logger
.logRecordBuilder()
.setBody("log body1")
.setAllAttributes(
Attributes.builder()
.put("k1", "v1")
.put("k2", "v2").build())
.setSeverity(Severity.INFO)
.setSeverityText("INFO")
.setEpoch(Instant.now())
.setContext(Context.current())
.emit();
```

* 输出

```
{
"time_unix_nano": "1663913736115000000",
"severity_number": "9",
"severity_text": "INFO",
"content": "log body1",
"attributes": "{\"k1\":\"v1\",\"k2\":\"v2\"}",
"resources": "{\"service.name\":\"OtlpExporterExample\",\"telemetry.sdk.language\":\"java\",\"telemetry.sdk.name\":\"opentelemetry\",\"telemetry.sdk.version\":\"1.18.0\"}",
"__time__": "1663913736"
}
```
2 changes: 2 additions & 0 deletions docs/cn/data-pipeline/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
| `service_syslog`<br>Syslog数据 | SLS官方 | 采集syslog数据。 |
| `service_gpu_metric`<br>GPU数据 | SLS官方 | 支持手机英伟达GPU指标。 |
| `observer_ilogtail_network`<br>无侵入网络调用数据 | SLS官方 | 支持从网络系统调用中收集四层网络调用,并借助网络解析模块,可以观测七层网络调用细节。 |
| `service_http_server otlp`<br>HTTP OTLP数据 | SLS官方 | 通过http协议,接收OTLP数据。 |

## 处理

Expand Down Expand Up @@ -48,6 +49,7 @@
| `flusher_kafka`<br>Kafka | 社区 | 将采集到的数据输出到Kafka。 |
| `flusher_sls`<br>SLS | SLS官方 | 将采集到的数据输出到SLS。 |
| `flusher_stdout`<br>标准输出/文件 | SLS官方 | 将采集到的数据输出到标准输出或文件。 |
| `flusher_otlp_log`<br>OTLP日志 | 社区<br>[`liuhaoyang`](https://github.com/liuhaoyang) | 将采集到的数据支持`Opentelemetry log protocol`的后端。 |

## 加速

Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ require (
github.com/siddontang/go-mysql v0.0.0-20180725024449-535abe8f2eba
github.com/sirupsen/logrus v1.8.1
github.com/smartystreets/goconvey v1.7.2
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.8.0
github.com/syndtr/goleveldb v0.0.0-20170725064836-b89cc31ef797
go.opentelemetry.io/collector/pdata v0.60.0
go.opentelemetry.io/proto/otlp v0.19.0
go.uber.org/atomic v1.7.0
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8
Expand Down
9 changes: 7 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1439,17 +1439,20 @@ github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5J
github.com/stretchr/objx v0.0.0-20180129172003-8a3f7159479f/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v0.0.0-20180303142811-b89eecf5ca5d/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
Expand Down Expand Up @@ -1558,6 +1561,8 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/collector/pdata v0.60.0 h1:jCNR5jtUom2FcUu30h4tw7enZytwGnXX6fs/K2FM/A0=
go.opentelemetry.io/collector/pdata v0.60.0/go.mod h1:0hqgNMRneVXaLNelv3q0XKJbyBW9aMDwyC15pKd30+E=
go.opentelemetry.io/contrib v0.20.0/go.mod h1:G/EtFaa6qaN7+LxqfIAT3GiZa7Wv5DTBUzl5H4LY0Kc=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0/go.mod h1:oVGt1LRbBOBq1A5BQLlUg9UaU/54aiHw8cgjV3aWZ/E=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.28.0/go.mod h1:vEhqr0m4eTc+DWxfsXoXue2GBgV2uUwVznkGIHW/e5w=
Expand Down
9 changes: 9 additions & 0 deletions helper/decoder/common/comon.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ import (
"github.com/pierrec/lz4"
)

const (
ProtocolSLS = "sls"
ProtocolPrometheus = "prometheus"
ProtocolInflux = "influx"
ProtocolInfluxdb = "influxdb"
ProtocolStatsd = "statsd"
ProtocolOTLPLogV1 = "otlp_logv1"
)

func CollectBody(res http.ResponseWriter, req *http.Request, maxBodySize int64) ([]byte, int, error) {
body := req.Body

Expand Down
12 changes: 8 additions & 4 deletions helper/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"strings"
"time"

"github.com/alibaba/ilogtail/helper/decoder/common"
"github.com/alibaba/ilogtail/helper/decoder/influxdb"
"github.com/alibaba/ilogtail/helper/decoder/opentelemetry"
"github.com/alibaba/ilogtail/helper/decoder/prometheus"
"github.com/alibaba/ilogtail/helper/decoder/sls"
"github.com/alibaba/ilogtail/helper/decoder/statsd"
Expand All @@ -39,16 +41,18 @@ var errDecoderNotFound = errors.New("no such decoder")
// GetDecoder return a new decoder for specific format
func GetDecoder(format string) (Decoder, error) {
switch strings.TrimSpace(strings.ToLower(format)) {
case "sls":
case common.ProtocolSLS:
return &sls.Decoder{}, nil
case "prometheus":
case common.ProtocolPrometheus:
return &prometheus.Decoder{}, nil
case "influx", "influxdb":
case common.ProtocolInflux, common.ProtocolInfluxdb:
return &influxdb.Decoder{}, nil
case "statsd":
case common.ProtocolStatsd:
return &statsd.Decoder{
Time: time.Now(),
}, nil
case common.ProtocolOTLPLogV1:
return &opentelemetry.Decoder{Format: common.ProtocolOTLPLogV1}, nil
}
return nil, errDecoderNotFound
}
125 changes: 125 additions & 0 deletions helper/decoder/opentelemetry/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2022 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package opentelemetry

import (
"encoding/json"
"errors"
"net/http"
"strconv"

"go.opentelemetry.io/collector/pdata/plog/plogotlp"

"github.com/alibaba/ilogtail/helper/decoder/common"
"github.com/alibaba/ilogtail/pkg/protocol"
)

const (
pbContentType = "application/x-protobuf"
jsonContentType = "application/json"
)

// Decoder impl
type Decoder struct {
Format string
}

// Decode impl
func (d *Decoder) Decode(data []byte, req *http.Request) (logs []*protocol.Log, err error) {
if common.ProtocolOTLPLogV1 == d.Format {
otlpLogReq := plogotlp.NewRequest()
switch req.Header.Get("Content-Type") {
case pbContentType:
if err = otlpLogReq.UnmarshalProto(data); err != nil {
return logs, err
}
logs, err = d.ConvertOtlpLogV1(otlpLogReq)
case jsonContentType:
if err = otlpLogReq.UnmarshalJSON(data); err != nil {
return logs, err
}
logs, err = d.ConvertOtlpLogV1(otlpLogReq)
default:
err = errors.New("Invalid ContentType: " + req.Header.Get("Content-Type"))
}
} else {
err = errors.New("Invalid RequestURI: " + req.RequestURI)
}
return logs, err
}

func (d *Decoder) ConvertOtlpLogV1(otlpLogReq plogotlp.Request) (logs []*protocol.Log, err error) {
resLogs := otlpLogReq.Logs().ResourceLogs()
for i := 0; i < resLogs.Len(); i++ {
resourceLog := resLogs.At(i)
sLogs := resourceLog.ScopeLogs()
for j := 0; j < sLogs.Len(); j++ {
scopeLog := sLogs.At(j)
lRecords := scopeLog.LogRecords()
for k := 0; k < lRecords.Len(); k++ {
logRecord := lRecords.At(k)

protoContents := []*protocol.Log_Content{
{
Key: "time_unix_nano",
Value: strconv.FormatInt(logRecord.Timestamp().AsTime().UnixNano(), 10),
},
{
Key: "severity_number",
Value: strconv.FormatInt(int64(logRecord.SeverityNumber()), 10),
},
{
Key: "severity_text",
Value: logRecord.SeverityText(),
},
{
Key: "content",
Value: logRecord.Body().AsString(),
},
}

if logRecord.Attributes().Len() != 0 {
if d, err := json.Marshal(logRecord.Attributes().AsRaw()); err == nil {
protoContents = append(protoContents, &protocol.Log_Content{
Key: "attributes",
Value: string(d),
})
}
}

if resourceLog.Resource().Attributes().Len() != 0 {
if d, err := json.Marshal(resourceLog.Resource().Attributes().AsRaw()); err == nil {
protoContents = append(protoContents, &protocol.Log_Content{
Key: "resources",
Value: string(d),
})
}
}

protoLog := &protocol.Log{
Time: uint32(logRecord.Timestamp().AsTime().Unix()),
Contents: protoContents,
}
logs = append(logs, protoLog)
}
}
}

return logs, nil
}

func (d *Decoder) ParseRequest(res http.ResponseWriter, req *http.Request, maxBodySize int64) (data []byte, statusCode int, err error) {
return common.CollectBody(res, req, maxBodySize)
}
Loading