Skip to content

Commit

Permalink
feat: add http flusher supporting influxdb protocol & add group aggre…
Browse files Browse the repository at this point in the history
…gator supproting group logs by keys (#521)

* feat: add influxdb flusher & group aggregator

* feat: http flusher support concurrency setting
  • Loading branch information
snakorse authored Dec 9, 2022
1 parent 2900354 commit ac73089
Show file tree
Hide file tree
Showing 43 changed files with 2,954 additions and 53 deletions.
2 changes: 2 additions & 0 deletions docs/cn/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,14 @@
* [聚合](data-pipeline/aggregator/README.md)
* [基础](data-pipeline/aggregator/aggregator-base.md)
* [上下文](data-pipeline/aggregator/aggregator-context.md)
* [按Key分组](data-pipeline/aggregator/aggregator-content-value-group.md)
* [输出](data-pipeline/flusher/README.md)
* [Kafka(Deprecated)](data-pipeline/flusher/kafka.md)
* [kafkaV2](data-pipeline/flusher/kafka_v2.md)
* [SLS](data-pipeline/flusher/sls.md)
* [标准输出/文件](data-pipeline/flusher/stdout.md)
* [OTLP日志](data-pipeline/flusher/otlp-log.md)
* [HTTP](data-pipeline/flusher/http.md)
* [加速](data-pipeline/accelerator/README.md)
* [分隔符加速](data-pipeline/accelerator/delimiter-accelerate.md)
* [Json加速](data-pipeline/accelerator/json-accelerate.md)
Expand Down
62 changes: 62 additions & 0 deletions docs/cn/data-pipeline/aggregator/aggregator-content-value-group.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# 基础聚合

## 简介

`aggregator_content_value_group` `aggregator`插件可以实现对单条日志按照指定的 Key 进行聚合。

## 配置参数

| 参数 | 类型 | 是否必选 | 说明 |
| ---------------- | -------- | -------- | ------------------------------------------------------------------------------------------------------------------ |
| Type | String || 插件类型,指定为`aggregator_content_value_group`|
| GroupKeys | []String || 指定需要按照其值分组的Key列表 |
| EnablePackID | Boolean || 是否需要在LogGroup的LogTag中添加__pack_id__字段。如果未添加改参数,则默认在LogGroup的LogTag中添加__pack_id__字段。 |
| Topic | String || LogGroup的Topic名。如果未添加该参数,则默认每个LogGroup的Topic名为空。 |
| ErrIfKeyNotFound | Boolean || 当指定的Key在Log的Contents中找不到时,是否打印错误日志 |

## 样例

采集`/home/test-log/`路径下的所有文件名匹配`reg.log`规则的文件,使用`processor_regex`提取字段后,再按照字段`url``method`字段聚合,并将采集结果发送到SLS。


* 输入

```bash
echo '127.0.0.1 - - [10/Aug/2017:14:57:51 +0800] "POST /PutData?Category=YunOsAccountOpLog" 0.024 18204 200 37 "-" "aliyun-sdk-java"' >> /home/test-log/reg.log
```


* 采集配置

```yaml
enable: true
inputs:
- Type: file_log
LogPath: /home/test-log/
FilePattern: "reg.log"
processors:
- Type: processor_regex
SourceKey: content
Regex: ([\d\.]+) \S+ \S+ \[(\S+) \S+\] \"(\w+) ([^\\"]*)\" ([\d\.]+) (\d+) (\d+) (\d+|-) \"([^\\"]*)\" \"([^\\"]*)\"
Keys:
- ip
- time
- method
- url
- request_time
- request_length
- status
- length
- ref_url
- browser
aggregators:
- Type: aggregator_content_value_group
GroupKeys:
- url
- method
flushers:
- Type: flusher_sls
Endpoint: cn-xxx.log.aliyuncs.com
ProjectName: test_project
LogstoreName: test_logstore
```
48 changes: 48 additions & 0 deletions docs/cn/data-pipeline/flusher/http.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# 标准输出/文件

## 简介

`flusher_http` `flusher`插件可以实现将采集到的数据,经过处理后,通过http格式发送到指定的地址。

## 配置参数

| 参数 | 类型 | 是否必选 | 说明 |
| ---------------------------- | ------------------ | -------- | --------------------------------------------------------------------------------- |
| Type | String || 插件类型,固定为`flusher_http` |
| RemoteURL | String || 要发送到的URL地址,示例:`http://localhost:8086/write` |
| Headers | Map<String,String> || 发送时附加的http请求header,如可添加 Authorization、Content-Type等信息 |
| Query | Map<String,String> || 发送时附加到url上的query参数,支持动态变量写法,如`{"db":"%{tag.db}"}` |
| Timeout | String || 请求的超时时间,默认 `60s` |
| Retry.Enable | Boolean || 是否开启失败重试,默认为 `true` |
| Retry.MaxRetryTimes | Int || 最大重试次数,默认为 `3` |
| Retry.InitialDelay | String || 首次重试时间间隔,默认为 `1s`,重试间隔以会2的倍数递增 |
| Retry.MaxDelay | String || 最大重试时间间隔,默认为 `30s` |
| Convert | Struct || ilogtail数据转换协议配置 |
| Convert.Protocol | String || ilogtail数据转换协议,可选值:`custom_single`,`influxdb`。默认值:`custom_single` |
| Convert.Encoding | String || ilogtail flusher数据转换编码,可选值:`json`, `custom`,默认值:`json` |
| Convert.TagFieldsRename | Map<String,String> || 对日志中tags中的json字段重命名 |
| Convert.ProtocolFieldsRename | Map<String,String> || ilogtail日志协议字段重命名,可当前可重命名的字段:`contents`,`tags``time` |
| Concurrency | Int || 向url发起请求的并发数,默认为`1` |



## 样例

采集`/home/test-log/`路径下的所有文件名匹配`*.log`规则的文件,并将采集结果以 `custom_single` 协议、`json`格式提交到 `http://localhost:8086/write`

```
enable: true
inputs:
- Type: file_log
LogPath: /home/test-log/
FilePattern: "*.log"
flushers:
- Type: flusher_http
RemoteURL: "http://localhost:8086/write"
Convert:
Protocol: custom_single
Encoding: json
```



19 changes: 10 additions & 9 deletions docs/cn/data-pipeline/input/service-http-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@

## 配置参数

| 参数 | 类型 | 是否必选 | 说明 |
| ------- | ------ | -------- | -------------------------------------------- |
| Type | String || 插件类型,固定为`service_http_server` |
| Format | String || <p>数据格式。</p> <p>支持格式:`sls``prometheus``influxdb``otlp_logv1``statsd`</p> |
| Address | String || <p>监听地址。</p><p></p> |
| ReadTimeoutSec | String || <p>读取超时时间。</p><p>默认取值为:`10s`。</p> |
| ShutdownTimeoutSec | String || <p>关闭超时时间。</p><p>默认取值为:`5s`。</p> |
| MaxBodySize | String || <p>最大传输 body 大小。</p><p>默认取值为:`64k`。</p> |
| UnlinkUnixSock | String || <p>启动前如果监听地址为unix socket,是否进行强制释放。</p><p>默认取值为:`true`。</p> |
| 参数 | 类型 | 是否必选 | 说明 |
| ------------------ | ------- | -------- | ------------------------------------------------------------------------------------------------------------------ |
| Type | String || 插件类型,固定为`service_http_server` |
| Format | String || <p>数据格式。</p> <p>支持格式:`sls``prometheus``influxdb``otlp_logv1``statsd`</p> |
| Address | String || <p>监听地址。</p><p></p> |
| ReadTimeoutSec | String || <p>读取超时时间。</p><p>默认取值为:`10s`。</p> |
| ShutdownTimeoutSec | String || <p>关闭超时时间。</p><p>默认取值为:`5s`。</p> |
| MaxBodySize | String || <p>最大传输 body 大小。</p><p>默认取值为:`64k`。</p> |
| UnlinkUnixSock | String || <p>启动前如果监听地址为unix socket,是否进行强制释放。</p><p>默认取值为:`true`。</p> |
| FieldsExtend | Boolean || <p>是否支持非integer以外的数据类型(如String)</p><p>目前仅针对有 String、Bool 等额外类型的 influxdb Format 有效</p> |

## 样例

Expand Down
5 changes: 5 additions & 0 deletions docs/cn/data-pipeline/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@

## 聚合

| 名称 | 提供方 | 简介 |
|-------------------------------|------------------------------------------------------|---------------------------------------------|
| `aggregator_content_value_group` | 社区<br>[`snakorse`](https://github.com/snakorse) | 按照指定的Key对采集到的数据进行分组聚合 |

## 输出

| 名称 | 提供方 | 简介 |
Expand All @@ -53,6 +57,7 @@
| `flusher_sls`<br>SLS | SLS官方 | 将采集到的数据输出到SLS。 |
| `flusher_stdout`<br>标准输出/文件 | SLS官方 | 将采集到的数据输出到标准输出或文件。 |
| `flusher_otlp_log`<br>OTLP日志 | 社区<br>[`liuhaoyang`](https://github.com/liuhaoyang) | 将采集到的数据支持`Opentelemetry log protocol`的后端。 |
| `flusher_http`<br>HTTP | 社区<br>[`snakorse`](https://github.com/snakorse) | 将采集到的数据以http方式输出到指定的后端。 |

## 加速

Expand Down
1 change: 1 addition & 0 deletions docs/cn/developer-guide/log-protocol/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ iLogtail的日志数据默认以sls自定义协议的形式与外部进行交互
| ------- | ------- | ------- |
| 标准协议 | [sls协议](./protocol-spec/sls.md) | json、protobuf |
| 自定义协议 | [单条协议](./protocol-spec/custom_single.md) | json |
| 标准协议 | [Influxdb协议](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_reference/) | custom |
2 changes: 2 additions & 0 deletions docs/cn/developer-guide/log-protocol/converter.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,15 @@ c, err := protocol.NewConverter("custom_single", "json", map[string]string{"host
| 协议名 | 意义 |
| ------ | ------ |
| custom_single | 单条协议 |
| influxdb | Influxdb协议 |

- 可选编码方式

| 编码方式 | 意义 |
| ------ | ------ |
| json | json编码方式 |
| protobuf | protobuf编码方式 |
| custom | 自定义编码方式 |

- sls协议中系统保留LogTag的Key默认值:

Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/influxdata/go-syslog v1.0.1
github.com/influxdata/influxdb v1.10.0
github.com/jackc/pgx/v4 v4.16.1
github.com/jarcoal/httpmock v1.2.0
github.com/jeromer/syslogparser v0.0.0-20190429161531-5fbaaf06d9e7
github.com/json-iterator/go v1.1.12
github.com/juju/errors v0.0.0-20170703010042-c7d06af17c68
Expand All @@ -41,6 +42,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/common v0.37.0
github.com/prometheus/procfs v0.8.0
github.com/prometheus/prometheus v1.8.2-0.20201119142752-3ad25a6dc3d9
github.com/shirou/gopsutil v3.21.3+incompatible
github.com/siddontang/go-mysql v0.0.0-20180725024449-535abe8f2eba
github.com/sirupsen/logrus v1.8.1
Expand Down Expand Up @@ -74,6 +76,7 @@ require (
github.com/frankban/quicktest v1.14.0 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
github.com/influxdata/line-protocol/v2 v2.2.1 // indirect
github.com/intel/goresctrl v0.2.0 // indirect
github.com/miekg/pkcs11 v1.1.1 // indirect
github.com/moby/locker v1.0.1 // indirect
Expand Down Expand Up @@ -138,7 +141,7 @@ require (
github.com/golang-sql/sqlexp v0.1.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/golang/snappy v0.0.4
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.3.0 // indirect
Expand Down
Loading

0 comments on commit ac73089

Please sign in to comment.