Skip to content

Commit

Permalink
feat: http flusher support variable config in Header (#643)
Browse files Browse the repository at this point in the history
* feat: http flusher support variable config in Header

* fix: lint

* doc: http flusher header support variables

* doc: add docs
  • Loading branch information
snakorse authored Feb 14, 2023
1 parent a7ff0a6 commit 39237b8
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,4 @@ your changes, such as:
- [public] [both] [updated] grok processor reports unmatched errors by default
- [public] [both] [fixed] grok processor gets stuck with Chinese
- [public] [both] [fixed] fix plugin version in logs
- [public] [both] [updated] flusher http support variable config in request header
33 changes: 17 additions & 16 deletions docs/cn/data-pipeline/flusher/http.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,30 @@

## 配置参数

| 参数 | 类型 | 是否必选 | 说明 |
|------------------------------|--------------------| -------- |---------------------------------------------------------------------------------------------------------------------------------------------------|
| Type | String || 插件类型,固定为`flusher_http` |
| RemoteURL | String || 要发送到的URL地址,示例:`http://localhost:8086/write` |
| Headers | Map<String,String> || 发送时附加的http请求header,如可添加 Authorization、Content-Type等信息 |
| Query | Map<String,String> || 发送时附加到url上的query参数,支持动态变量写法,如`{"db":"%{tag.db}"}`<p>v2版本支持从Group的Metadata或者Group.Tags中获取动态变量,如`{"db":"%{metadata.db}"}`或者`{"db":"%{tag.db}"}`</p> |
| Timeout | String || 请求的超时时间,默认 `60s` |
| Retry.Enable | Boolean || 是否开启失败重试,默认为 `true` |
| Retry.MaxRetryTimes | Int || 最大重试次数,默认为 `3` |
| Retry.InitialDelay | String || 首次重试时间间隔,默认为 `1s`,重试间隔以会2的倍数递增 |
| Retry.MaxDelay | String || 最大重试时间间隔,默认为 `30s` |
| Convert | Struct || ilogtail数据转换协议配置 |
| Convert.Protocol | String || ilogtail数据转换协议,可选值:`custom_single`,`influxdb`。默认值:`custom_single`<p>v2版本可选值:`raw`</p> |
| Convert.Encoding | String || ilogtail flusher数据转换编码,可选值:`json`, `custom`,默认值:`json` |
| 参数 | 类型 | 是否必选 | 说明 |
|------------------------------|--------------------| -------- |--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Type | String || 插件类型,固定为`flusher_http` |
| RemoteURL | String || 要发送到的URL地址,示例:`http://localhost:8086/write` |
| Headers | Map<String,String> || 发送时附加的http请求header,如可添加 Authorization、Content-Type等信息,支持动态变量写法,如`{"x-db":"%{tag.db}"}`<p>v2版本支持从Group的Metadata或者Group.Tags中获取动态变量,如`{"x-db":"%{metadata.db}"}`或者`{"x-db":"%{tag.db}"}`</p> |
| Query | Map<String,String> || 发送时附加到url上的query参数,支持动态变量写法,如`{"db":"%{tag.db}"}`<p>v2版本支持从Group的Metadata或者Group.Tags中获取动态变量,如`{"db":"%{metadata.db}"}`或者`{"db":"%{tag.db}"}`</p> |
| Timeout | String || 请求的超时时间,默认 `60s` |
| Retry.Enable | Boolean || 是否开启失败重试,默认为 `true` |
| Retry.MaxRetryTimes | Int || 最大重试次数,默认为 `3` |
| Retry.InitialDelay | String || 首次重试时间间隔,默认为 `1s`,重试间隔以会2的倍数递增 |
| Retry.MaxDelay | String || 最大重试时间间隔,默认为 `30s` |
| Convert | Struct || ilogtail数据转换协议配置 |
| Convert.Protocol | String || ilogtail数据转换协议,可选值:`custom_single`,`influxdb`。默认值:`custom_single`<p>v2版本可选值:`raw`</p> |
| Convert.Encoding | String || ilogtail flusher数据转换编码,可选值:`json`, `custom`,默认值:`json` |
| Convert.Separator | String || ilogtail数据转换时,PipelineGroupEvents中多个Events之间拼接使用的分隔符。如`\n`。若不设置,则默认不拼接Events,即每个Event作为独立请求向后发送。 默认值为空。<p>当前仅在`Convert.Protocol: raw`有效。</p> |
| Convert.IgnoreUnExpectedData | Boolean || ilogtail数据转换时,遇到非预期的数据的行为,true 跳过,false 报错。默认值 true |
| Convert.TagFieldsRename | Map<String,String> || 对日志中tags中的json字段重命名 |
| Convert.ProtocolFieldsRename | Map<String,String> || ilogtail日志协议字段重命名,可当前可重命名的字段:`contents`,`tags``time` |
| Concurrency | Int || 向url发起请求的并发数,默认为`1` |



## 样例

采集`/home/test-log/`路径下的所有文件名匹配`*.log`规则的文件,并将采集结果以 `custom_single` 协议、`json`格式提交到 `http://localhost:8086/write`
且提交时,附加 header x-filepath,其值使用log中的 __Tag__:__path__ 的值

```
enable: true
Expand All @@ -41,6 +40,8 @@ inputs:
flushers:
- Type: flusher_http
RemoteURL: "http://localhost:8086/write"
Headers:
x-filepath: "%{tag.__path__}"
Convert:
Protocol: custom_single
Encoding: json
Expand Down
55 changes: 32 additions & 23 deletions plugins/flusher/http/flusher_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type FlusherHTTP struct {
Convert helper.ConvertConfig // Convert defines which protocol and format to convert to
Concurrency int // How many requests can be performed in concurrent

queryVarKeys []string
varKeys []string

context pipeline.Context
converter *converter.Converter
Expand Down Expand Up @@ -114,7 +114,7 @@ func (f *FlusherHTTP) Init(context pipeline.Context) error {
go f.runFlushTask()
}

f.buildQueryVarKeys()
f.buildVarKeys()
f.fillRequestContentType()

logger.Info(f.context.GetRuntimeContext(), "http flusher init", "initialized")
Expand Down Expand Up @@ -177,9 +177,9 @@ func (f *FlusherHTTP) convertAndFlush(data interface{}) error {
var err error
switch v := data.(type) {
case *protocol.LogGroup:
logs, varValues, err = f.converter.ToByteStreamWithSelectedFields(v, f.queryVarKeys)
logs, varValues, err = f.converter.ToByteStreamWithSelectedFields(v, f.varKeys)
case *models.PipelineGroupEvents:
logs, varValues, err = f.converter.ToByteStreamWithSelectedFieldsV2(v, f.queryVarKeys)
logs, varValues, err = f.converter.ToByteStreamWithSelectedFieldsV2(v, f.varKeys)
default:
return fmt.Errorf("unsupport data type")
}
Expand Down Expand Up @@ -251,7 +251,7 @@ func (f *FlusherHTTP) flush(data []byte, varValues map[string]string) (ok, retry
if len(f.Query) > 0 {
values := req.URL.Query()
for k, v := range f.Query {
if len(f.queryVarKeys) == 0 {
if len(f.varKeys) == 0 {
values.Add(k, v)
continue
}
Expand All @@ -268,6 +268,16 @@ func (f *FlusherHTTP) flush(data []byte, varValues map[string]string) (ok, retry
}

for k, v := range f.Headers {
if len(f.varKeys) == 0 {
req.Header.Add(k, v)
continue
}
fv, ferr := fmtstr.FormatTopic(varValues, v)
if ferr != nil {
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "http flusher format header fail, error", ferr)
} else {
v = *fv
}
req.Header.Add(k, v)
}
response, err := f.client.Do(req)
Expand Down Expand Up @@ -298,28 +308,27 @@ func (f *FlusherHTTP) flush(data []byte, varValues map[string]string) (ok, retry
}
}

func (f *FlusherHTTP) buildQueryVarKeys() {
var varKeys []string
for _, v := range f.Query {
keys, err := fmtstr.CompileKeys(v)
if err != nil {
logger.Warning(f.context.GetRuntimeContext(), "FLUSHER_INIT_ALARM", "http flusher init queryVarKeys fail, err", err)
}
for _, key := range keys {
var exists bool
for _, k := range varKeys {
if k == v {
exists = true
break
}
func (f *FlusherHTTP) buildVarKeys() {
cache := map[string]struct{}{}
defines := []map[string]string{f.Query, f.Headers}

for _, define := range defines {
for _, v := range define {
keys, err := fmtstr.CompileKeys(v)
if err != nil {
logger.Warning(f.context.GetRuntimeContext(), "FLUSHER_INIT_ALARM", "http flusher init varKeys fail, err", err)
}
if exists {
continue
for _, key := range keys {
cache[key] = struct{}{}
}
varKeys = append(varKeys, key)
}
}
f.queryVarKeys = varKeys

varKeys := make([]string, 0, len(cache))
for k := range cache {
varKeys = append(varKeys, k)
}
f.varKeys = varKeys
}

func (f *FlusherHTTP) fillRequestContentType() {
Expand Down
50 changes: 49 additions & 1 deletion plugins/flusher/http/flusher_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,51 @@ func TestHttpFlusherInit(t *testing.T) {
Convey("Then Init() should build the variable keys", func() {
err := flusher.Init(mockContext{})
So(err, ShouldBeNil)
So(flusher.queryVarKeys, ShouldResemble, []string{"var"})
So(flusher.varKeys, ShouldResemble, []string{"var"})
})
})

Convey("Given a http flusher with Headers contains variable ", t, func() {
flusher := &FlusherHTTP{
RemoteURL: "http://localhost:8086/write",
Convert: helper.ConvertConfig{
Protocol: converter.ProtocolCustomSingle,
Encoding: converter.EncodingJSON,
},
Timeout: defaultTimeout,
Concurrency: 1,
Headers: map[string]string{
"name": "_%{var}",
},
}
Convey("Then Init() should build the variable keys", func() {
err := flusher.Init(mockContext{})
So(err, ShouldBeNil)
So(flusher.varKeys, ShouldResemble, []string{"var"})
})
})

Convey("Given a http flusher with Query AND Headers contains variable ", t, func() {
flusher := &FlusherHTTP{
RemoteURL: "http://localhost:8086/write",
Convert: helper.ConvertConfig{
Protocol: converter.ProtocolCustomSingle,
Encoding: converter.EncodingJSON,
},
Timeout: defaultTimeout,
Concurrency: 1,
Query: map[string]string{
"name": "_%{var1}",
},
Headers: map[string]string{
"name": "_%{var2}",
"tt": "_%{var1}",
},
}
Convey("Then Init() should build the variable keys", func() {
err := flusher.Init(mockContext{})
So(err, ShouldBeNil)
So(flusher.varKeys, ShouldResemble, []string{"var1", "var2"})
})
})
}
Expand All @@ -96,6 +140,7 @@ func TestHttpFlusherFlush(t *testing.T) {
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("POST", "http://test.com/write?db=mydb", func(req *http.Request) (*http.Response, error) {
assert.Equal(t, "mydb", req.Header.Get("db"))
body, _ := ioutil.ReadAll(req.Body)
actualRequests = append(actualRequests, string(body))
return httpmock.NewStringResponse(200, "ok"), nil
Expand All @@ -112,6 +157,9 @@ func TestHttpFlusherFlush(t *testing.T) {
Query: map[string]string{
"db": "%{tag.db}",
},
Headers: map[string]string{
"db": "%{tag.db}",
},
}

err := flusher.Init(mockContext{})
Expand Down

0 comments on commit 39237b8

Please sign in to comment.