Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: prometheus decoder optimize #866

Merged
merged 14 commits into from
May 26, 2023
37 changes: 19 additions & 18 deletions docs/cn/data-pipeline/input/service-http-server.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,26 @@

## 配置参数

| 参数 | 类型 | 是否必选 | 说明 |
|--------------------|-------------------|------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Type | String | 是 | 插件类型,固定为`service_http_server` |
| 参数 | 类型 | 是否必选 | 说明 |
|--------------------|-------------------|------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Type | String | 是 | 插件类型,固定为`service_http_server` |
| Format | String | 否 | <p>数据格式。</p> <p>支持格式:`sls`、`prometheus`、`influxdb`、`otlp_logv1`、 `otlp_metricv1`、`pyroscope`、`statsd`</p> <p>v2版本支持格式:`raw`、`prometheus`、`otlp_logv1`、`otlp_metricv1`、`otlp_tracev1`</p><p>说明:`raw`格式以原始请求字节流传输数据</p> |
| Address | String | 否 | <p>监听地址。</p><p></p> |
| Path | String | 否 | <p>接收端点, 如Format 为 `otlp_logv1` 时, 默认端点为`/v1/logs` 。</p><p></p> |
| ReadTimeoutSec | String | 否 | <p>读取超时时间。</p><p>默认取值为:`10s`。</p> |
| ShutdownTimeoutSec | String | 否 | <p>关闭超时时间。</p><p>默认取值为:`5s`。</p> |
| MaxBodySize | String | 否 | <p>最大传输 body 大小。</p><p>默认取值为:`64k`。</p> |
| UnlinkUnixSock | String | 否 | <p>启动前如果监听地址为unix socket,是否进行强制释放。</p><p>默认取值为:`true`。</p> |
| FieldsExtend | Boolean | 否 | <p>是否支持非integer以外的数据类型(如String)</p><p>目前仅针对有 String、Bool 等额外类型的 influxdb Format 有效</p> |
| QueryParams | []String | 否 | 需要解析到Group.Metadata中的请求参数。<p>解析结果会以KeyValue放入Metadata。默认取值为`[]`,即不解析。</p><p>仅v2版本有效</p> |
| QueryParamPrefix | String | 否 | 解析请求参数时需要添加的key前缀,如`_query_param_`。<p>前缀会直接拼接在每个QueryParam前,无额外连接符,默认取值为空,即不增加前缀。</p><p>仅v2版本有效</p> |
| HeaderParams | []String | 否 | 需要解析到Group.Metadata中的header参数。<p>解析结果会以KeyValue放入Metadata。默认取值为`[]`,即不解析。</p><p>仅v2版本有效</p> |
| HeaderParamPrefix | String | 否 | 解析Header参数时需要添加的key前缀,如`_header_param_`。<p>前缀会直接拼接在每个HeaderParam前,无额外连接符,默认取值为空,即不增加前缀。</p><p>仅v2版本有效</p> |
| DisableUncompress | Boolean | 否 | 禁用对于请求数据的解压缩, 默认取值为:`false`<p>目前仅针对Raw Format有效</p><p>仅v2版本有效</p> |
| Tags | map[String]String | 否 | 输出数据默认携带标签<p>仅v1版本有效</p> |
| DumpData | Boolean | 否 | [开发使用] 将接收的请求存储于本地文件, 默认取值为:`false` |
| DumpDataKeepFiles | Int | 否 | [开发使用] Dump文件保留文件数目, 文件按小时滚动, 此参数默认值为5, 表示保留5小时Dump 参数 |
| Address | String | 否 | <p>监听地址。</p><p></p> |
| Path | String | 否 | <p>接收端点, 如Format 为 `otlp_logv1` 时, 默认端点为`/v1/logs` 。</p><p></p> |
| ReadTimeoutSec | String | 否 | <p>读取超时时间。</p><p>默认取值为:`10s`。</p> |
| ShutdownTimeoutSec | String | 否 | <p>关闭超时时间。</p><p>默认取值为:`5s`。</p> |
| MaxBodySize | String | 否 | <p>最大传输 body 大小。</p><p>默认取值为:`64k`。</p> |
| UnlinkUnixSock | String | 否 | <p>启动前如果监听地址为unix socket,是否进行强制释放。</p><p>默认取值为:`true`。</p> |
| FieldsExtend | Boolean | 否 | <p>是否支持非integer以外的数据类型(如String)</p><p>目前仅针对有 String、Bool 等额外类型的 influxdb Format 有效</p> |
| QueryParams | []String | 否 | 需要解析到Group.Metadata中的请求参数。<p>解析结果会以KeyValue放入Metadata。默认取值为`[]`,即不解析。</p><p>仅v2版本有效</p> |
| QueryParamPrefix | String | 否 | 解析请求参数时需要添加的key前缀,如`_query_param_`。<p>前缀会直接拼接在每个QueryParam前,无额外连接符,默认取值为空,即不增加前缀。</p><p>仅v2版本有效</p> |
| HeaderParams | []String | 否 | 需要解析到Group.Metadata中的header参数。<p>解析结果会以KeyValue放入Metadata。默认取值为`[]`,即不解析。</p><p>仅v2版本有效</p> |
| HeaderParamPrefix | String | 否 | 解析Header参数时需要添加的key前缀,如`_header_param_`。<p>前缀会直接拼接在每个HeaderParam前,无额外连接符,默认取值为空,即不增加前缀。</p><p>仅v2版本有效</p> |
| DisableUncompress | Boolean | 否 | 禁用对于请求数据的解压缩, 默认取值为:`false`<p>目前仅针对Raw Format有效</p><p>仅v2版本有效</p> |
| Tags | map[String]String | 否 | 输出数据默认携带标签<p>仅v1版本有效</p> |
| DumpData | Boolean | 否 | [开发使用] 将接收的请求存储于本地文件, 默认取值为:`false` |
| DumpDataKeepFiles | Int | 否 | [开发使用] Dump文件保留文件数目, 文件按小时滚动, 此参数默认值为5, 表示保留5小时Dump 参数 |
| AllowUnsafeMode | Boolean | 否 | 是否允许unsafe模式的Decode,启用该模式,Decoder将可能利用go unsafe技术来加速解码,目前仅当Format=prometheus时有效(注:暂不支持Exemplar、Histogram) |

## 样例

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ require (
github.com/VictoriaMetrics/metricsql v0.45.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/richardartoul/molecule v1.0.0 // indirect
github.com/valyala/fastjson v1.6.3 // indirect
github.com/valyala/fastrand v1.1.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1394,6 +1394,8 @@ github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqn
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/richardartoul/molecule v1.0.0 h1:+LFA9cT7fn8KF39zy4dhOnwcOwRoqKiBkPqKqya+8+U=
github.com/richardartoul/molecule v1.0.0/go.mod h1:uvX/8buq8uVeiZiFht+0lqSLBHF+uGV8BrTv8W/SIwk=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
Expand Down Expand Up @@ -1756,6 +1758,7 @@ golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.5.0/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro=
golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro=
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20180406214816-61147c48b25b/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down
1 change: 1 addition & 0 deletions licenses/LICENSE_OF_ILOGTAIL_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ When distributed in a binary form, iLogtail may contain portions of the followin
- [github.com/mailru/easyjson](https://pkg.go.dev/github.com/mailru/easyjson?tab=licenses)
- [github.com/mitchellh/mapstructure](https://pkg.go.dev/github.com/mitchellh/mapstructure?tab=licenses)
- [github.com/paulbellamy/ratecounter](https://pkg.go.dev/github.com/paulbellamy/ratecounter?tab=licenses)
- [github.com/richardartoul/molecule](https://pkg.go.dev/github.com/richardartoul/molecule?tab=licenses)
- [github.com/satori/go.uuid](https://pkg.go.dev/github.com/satori/go.uuid?tab=licenses)
- [github.com/shopspring/decimal](https://pkg.go.dev/github.com/shopspring/decimal?tab=licenses)
- [github.com/siddontang/go](https://pkg.go.dev/github.com/siddontang/go?tab=licenses)
Expand Down
1 change: 1 addition & 0 deletions pkg/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/prometheus/prometheus v1.8.2-0.20201119142752-3ad25a6dc3d9
github.com/pyroscope-io/jfr-parser v0.6.0
github.com/pyroscope-io/pyroscope v0.0.0-00010101000000-000000000000
github.com/richardartoul/molecule v1.0.0
github.com/smartystreets/goconvey v1.7.2
github.com/stretchr/testify v1.8.2
go.opentelemetry.io/collector/pdata v0.66.0
Expand Down
2 changes: 2 additions & 0 deletions pkg/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1149,6 +1149,8 @@ github.com/prometheus/prometheus v1.8.2-0.20201119142752-3ad25a6dc3d9/go.mod h1:
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc=
github.com/richardartoul/molecule v1.0.0 h1:+LFA9cT7fn8KF39zy4dhOnwcOwRoqKiBkPqKqya+8+U=
github.com/richardartoul/molecule v1.0.0/go.mod h1:uvX/8buq8uVeiZiFht+0lqSLBHF+uGV8BrTv8W/SIwk=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
Expand Down
38 changes: 33 additions & 5 deletions pkg/protocol/decoder/common/comon.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
package common

import (
"bytes"
"compress/gzip"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"strconv"
"sync"

"github.com/golang/snappy"
"github.com/pierrec/lz4"
Expand All @@ -40,6 +42,23 @@ const (
ProtocolPyroscope = "pyroscope"
)

var bufPool = sync.Pool{
New: func() interface{} {
buf := bytes.NewBuffer(make([]byte, 0, 32*1024))
return buf
},
}

func GetPooledBuf() *bytes.Buffer {
buf := bufPool.Get().(*bytes.Buffer)
return buf
}

func PutPooledBuf(buf *bytes.Buffer) {
buf.Reset()
bufPool.Put(buf)
}

func CollectBody(res http.ResponseWriter, req *http.Request, maxBodySize int64) ([]byte, int, error) {
body := req.Body

Expand All @@ -54,16 +73,25 @@ func CollectBody(res http.ResponseWriter, req *http.Request, maxBodySize int64)
}

body = http.MaxBytesReader(res, body, maxBodySize)
bytes, err := ioutil.ReadAll(body)
if err != nil {
return nil, http.StatusRequestEntityTooLarge, err
}

if req.Header.Get("Content-Encoding") == "snappy" {
bytes, err = snappy.Decode(nil, bytes)
// for snappy encoding, use pooled buf to read compressed request body
buf := GetPooledBuf()
defer PutPooledBuf(buf)
_, err := io.Copy(buf, body) // nolint
if err != nil {
return nil, http.StatusBadRequest, err
}
data, err := snappy.Decode(nil, buf.Bytes())
if err != nil {
return nil, http.StatusBadRequest, err
}
return data, http.StatusOK, nil
}

bytes, err := ioutil.ReadAll(body)
if err != nil {
return nil, http.StatusRequestEntityTooLarge, err
}

if req.Header.Get("x-log-compresstype") == "lz4" {
Expand Down
3 changes: 2 additions & 1 deletion pkg/protocol/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
type Option struct {
FieldsExtend bool
DisableUncompress bool
AllowUnsafeMode bool
}

// GetDecoder return a new decoder for specific format
Expand All @@ -45,7 +46,7 @@ func GetDecoderWithOptions(format string, option Option) (extensions.Decoder, er
case common.ProtocolSLS:
return &sls.Decoder{}, nil
case common.ProtocolPrometheus:
return &prometheus.Decoder{}, nil
return &prometheus.Decoder{AllowUnsafeMode: option.AllowUnsafeMode}, nil
case common.ProtocolInflux, common.ProtocolInfluxdb:
return &influxdb.Decoder{FieldsExtend: option.FieldsExtend}, nil
case common.ProtocolStatsd:
Expand Down
Loading