Skip to content

Commit

Permalink
feat: http flusher add Convert.Separator config field (#616)
Browse files Browse the repository at this point in the history
* feat: http flusher add Convert.Separator config field

* fix: add docs for http flusher convert separator

* feat: add docs for raw protocol
  • Loading branch information
urnotsally authored Feb 2, 2023
1 parent 6cb1ff3 commit 77b65fb
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 21 deletions.
5 changes: 3 additions & 2 deletions docs/cn/data-pipeline/flusher/http.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

## 配置参数

| 参数 | 类型 | 是否必选 | 说明 |
| ---------------------------- | ------------------ | -------- |---------------------------------------------------------------------------------------------------------------------------------------------------|
| 参数 | 类型 | 是否必选 | 说明 |
|------------------------------|--------------------| -------- |---------------------------------------------------------------------------------------------------------------------------------------------------|
| Type | String || 插件类型,固定为`flusher_http` |
| RemoteURL | String || 要发送到的URL地址,示例:`http://localhost:8086/write` |
| Headers | Map<String,String> || 发送时附加的http请求header,如可添加 Authorization、Content-Type等信息 |
Expand All @@ -20,6 +20,7 @@
| Convert | Struct || ilogtail数据转换协议配置 |
| Convert.Protocol | String || ilogtail数据转换协议,可选值:`custom_single`,`influxdb`。默认值:`custom_single`<p>v2版本可选值:`raw`</p> |
| Convert.Encoding | String || ilogtail flusher数据转换编码,可选值:`json`, `custom`,默认值:`json` |
| Convert.Separator | String || ilogtail数据转换时,PipelineGroupEvents中多个Events之间拼接使用的分隔符。如`\n`。若不设置,则默认不拼接Events,即每个Event作为独立请求向后发送。 默认值为空。<p>当前仅在`Convert.Protocol: raw`有效。</p> |
| Convert.TagFieldsRename | Map<String,String> || 对日志中tags中的json字段重命名 |
| Convert.ProtocolFieldsRename | Map<String,String> || ilogtail日志协议字段重命名,可当前可重命名的字段:`contents`,`tags``time` |
| Concurrency | Int || 向url发起请求的并发数,默认为`1` |
Expand Down
11 changes: 6 additions & 5 deletions docs/cn/developer-guide/log-protocol/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ iLogtail的日志数据默认以sls自定义协议的形式与外部进行交互

目前,iLogtail日志数据支持的协议及相应的编码方式如下表所示,其中协议类型可分为自定义协议和标准协议:

| 协议类型 | 协议名称 | 支持的编码方式 |
| ------- | ------- | ------- |
| 标准协议 | [sls协议](./protocol-spec/sls.md) | json、protobuf |
| 自定义协议 | [单条协议](./protocol-spec/custom_single.md) | json |
| 标准协议 | [Influxdb协议](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_reference/) | custom |
| 协议类型 | 协议名称 | 支持的编码方式 |
|-------|--------------------------------------------------------------------------------------------------|---------------|
| 标准协议 | [sls协议](./protocol-spec/sls.md) | json、protobuf |
| 自定义协议 | [单条协议](./protocol-spec/custom_single.md) | json |
| 标准协议 | [Influxdb协议](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_reference/) | custom |
| 字节流协议 | [raw协议](./protocol-spec/raw.md) | custom |
30 changes: 30 additions & 0 deletions docs/cn/developer-guide/log-protocol/protocol-spec/raw.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# raw协议

raw协议在内存中对应的数据结构定义如下:

## ByteArray & GroupInfo

传输数据的原始字节流,是`PipelineEvent`的一种实现。

```go
type ByteArray []byte
```

传输数据的标签及元数据信息,简单的key/value对。

```go
type GroupInfo struct {
Metadata Metadata
Tags Tags
}
```

## PipelineGroupEvents
传输数据整体。

```go
type PipelineGroupEvents struct {
Group *GroupInfo
Events []PipelineEvent
}
```
1 change: 1 addition & 0 deletions helper/converter_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package helper
type ConvertConfig struct {
TagFieldsRename map[string]string // Rename one or more fields from tags.
ProtocolFieldsRename map[string]string // Rename one or more fields, The protocol field options can only be: contents, tags, time
Separator string // Convert separator
Protocol string // Convert protocol
Encoding string // Convert encoding
}
10 changes: 10 additions & 0 deletions pkg/protocol/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,20 @@ var supportedEncodingMap = map[string]map[string]bool{
type Converter struct {
Protocol string
Encoding string
Separator string
TagKeyRenameMap map[string]string
ProtocolKeyRenameMap map[string]string
}

func NewConverterWithSep(protocol, encoding, sep string, tagKeyRenameMap, protocolKeyRenameMap map[string]string) (*Converter, error) {
converter, err := NewConverter(protocol, encoding, tagKeyRenameMap, protocolKeyRenameMap)
if err != nil {
return nil, err
}
converter.Separator = sep
return converter, nil
}

func NewConverter(protocol, encoding string, tagKeyRenameMap, protocolKeyRenameMap map[string]string) (*Converter, error) {
enc, ok := supportedEncodingMap[protocol]
if !ok {
Expand Down
39 changes: 32 additions & 7 deletions pkg/protocol/converter/converter_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,48 @@ func (c *Converter) ConvertToRawStream(groupEvents *models.PipelineGroupEvents,
return nil, nil, nil
}

byteStream := *GetPooledByteBuf()
var targetValues map[string]string
if len(targetFields) > 0 {
targetValues = findTargetFieldsInGroup(targetFields, groupEvents.Group)
}

if len(c.Separator) == 0 {
return getByteStream(groupEvents, targetValues)
}

return getByteStreamWithSep(groupEvents, targetValues, c.Separator)
}

func getByteStreamWithSep(groupEvents *models.PipelineGroupEvents, targetValues map[string]string, sep string) (stream [][]byte, values []map[string]string, err error) {
joinedStream := *GetPooledByteBuf()
for idx, event := range groupEvents.Events {
eventType := event.GetType()
if eventType != models.EventTypeByteArray {
return nil, nil, fmt.Errorf("unsupported event type %v", eventType)
}
if idx != 0 {
byteStream = append(byteStream, '\n')
joinedStream = append(joinedStream, sep...)
}
byteStream = append(byteStream, event.(models.ByteArray)...)
joinedStream = append(joinedStream, event.(models.ByteArray)...)
}
return [][]byte{joinedStream}, []map[string]string{targetValues}, nil
}

var targetValues map[string]string
if len(targetFields) > 0 {
targetValues = findTargetFieldsInGroup(targetFields, groupEvents.Group)
func getByteStream(groupEvents *models.PipelineGroupEvents, targetValues map[string]string) (stream [][]byte, values []map[string]string, err error) {
byteGroup := make([][]byte, 0, len(groupEvents.Events))
valueGroup := make([]map[string]string, 0, len(groupEvents.Events))
for _, event := range groupEvents.Events {
eventType := event.GetType()
if eventType != models.EventTypeByteArray {
return nil, nil, fmt.Errorf("unsupported event type %v", eventType)
}

byteStream := *GetPooledByteBuf()
byteStream = append(byteStream, event.(models.ByteArray)...)
byteGroup = append(byteGroup, byteStream)
valueGroup = append(valueGroup, targetValues)
}
return [][]byte{byteStream}, []map[string]string{targetValues}, nil
return byteGroup, valueGroup, nil
}

func findTargetFieldsInGroup(targetFields []string, group *models.GroupInfo) map[string]string {
Expand Down
83 changes: 80 additions & 3 deletions pkg/protocol/converter/converter_raw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ func TestConverter_ConvertToRawStream(t *testing.T) {
type fields struct {
Protocol string
Encoding string
Separator string
TagKeyRenameMap map[string]string
ProtocolKeyRenameMap map[string]string
}
Expand All @@ -21,8 +22,9 @@ func TestConverter_ConvertToRawStream(t *testing.T) {
targetFields []string
}
mockValidFields := fields{
Protocol: ProtocolRaw,
Encoding: EncodingCustom,
Protocol: ProtocolRaw,
Encoding: EncodingCustom,
Separator: "\n",
}
mockInvalidFields := fields{
Protocol: ProtocolRaw,
Expand Down Expand Up @@ -57,7 +59,7 @@ func TestConverter_ConvertToRawStream(t *testing.T) {
Events: []models.PipelineEvent{models.ByteArray(mockByteEvent), models.ByteArray(mockByteEvent)}},
targetFields: []string{"metadata.db"},
},
wantStream: [][]byte{append(append(mockByteEvent, '\n'), mockByteEvent...)},
wantStream: [][]byte{append(append(mockByteEvent, "\n"...), mockByteEvent...)},
wantValues: []map[string]string{{"metadata.db": "test"}},
wantErr: assert.NoError,
}, {
Expand All @@ -83,6 +85,81 @@ func TestConverter_ConvertToRawStream(t *testing.T) {
c := &Converter{
Protocol: tt.fields.Protocol,
Encoding: tt.fields.Encoding,
Separator: tt.fields.Separator,
TagKeyRenameMap: tt.fields.TagKeyRenameMap,
ProtocolKeyRenameMap: tt.fields.ProtocolKeyRenameMap,
}
gotStream, gotValues, err := c.ConvertToRawStream(tt.args.groupEvents, tt.args.targetFields)
if !tt.wantErr(t, err, fmt.Sprintf("ConvertToRawStream(%v, %v)", tt.args.groupEvents, tt.args.targetFields)) {
return
}
assert.Equalf(t, tt.wantStream, gotStream, "ConvertToRawStream(%v, %v)", tt.args.groupEvents, tt.args.targetFields)
assert.Equalf(t, tt.wantValues, gotValues, "ConvertToRawStream(%v, %v)", tt.args.groupEvents, tt.args.targetFields)
})
}
}

func TestConverter_ConvertToRawStreamSeparator(t *testing.T) {
type fields struct {
Protocol string
Encoding string
Separator string
TagKeyRenameMap map[string]string
ProtocolKeyRenameMap map[string]string
}
type args struct {
groupEvents *models.PipelineGroupEvents
targetFields []string
}
mockFieldsWithSep := fields{
Protocol: ProtocolRaw,
Encoding: EncodingCustom,
Separator: "\r\n",
}
mockFieldsWithoutSep := fields{
Protocol: ProtocolRaw,
Encoding: EncodingJSON,
}
mockGroup := models.NewGroup(models.NewMetadataWithMap(map[string]string{"db": "test"}), nil)
mockByteEvent := []byte("cpu.load.short,host=server01,region=cn value=0.6")
tests := []struct {
name string
fields fields
args args
wantStream [][]byte
wantValues []map[string]string
wantErr assert.ErrorAssertionFunc
}{
{
name: "join with sep",
fields: mockFieldsWithSep,
args: args{
groupEvents: &models.PipelineGroupEvents{Group: mockGroup,
Events: []models.PipelineEvent{models.ByteArray(mockByteEvent), models.ByteArray(mockByteEvent)}},
targetFields: []string{"metadata.db"},
},
wantStream: [][]byte{append(append(mockByteEvent, "\r\n"...), mockByteEvent...)},
wantValues: []map[string]string{{"metadata.db": "test"}},
wantErr: assert.NoError,
}, {
name: "not join",
fields: mockFieldsWithoutSep,
args: args{
groupEvents: &models.PipelineGroupEvents{Group: mockGroup,
Events: []models.PipelineEvent{models.ByteArray(mockByteEvent), models.ByteArray(mockByteEvent)}},
targetFields: []string{"metadata.db"},
},
wantStream: [][]byte{mockByteEvent, mockByteEvent},
wantValues: []map[string]string{{"metadata.db": "test"}, {"metadata.db": "test"}},
wantErr: assert.NoError,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Converter{
Protocol: tt.fields.Protocol,
Encoding: tt.fields.Encoding,
Separator: tt.fields.Separator,
TagKeyRenameMap: tt.fields.TagKeyRenameMap,
ProtocolKeyRenameMap: tt.fields.ProtocolKeyRenameMap,
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/flusher/http/flusher_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (f *FlusherHTTP) Stop() error {
}

func (f *FlusherHTTP) getConverter() (*converter.Converter, error) {
return converter.NewConverter(f.Convert.Protocol, f.Convert.Encoding, nil, nil)
return converter.NewConverterWithSep(f.Convert.Protocol, f.Convert.Encoding, f.Convert.Separator, nil, nil)
}

func (f *FlusherHTTP) addTask(log interface{}) {
Expand Down
89 changes: 86 additions & 3 deletions plugins/flusher/http/flusher_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func TestHttpFlusherFlush(t *testing.T) {
}

func TestHttpFlusherExport(t *testing.T) {
Convey("Given a http flusher with protocol: Raw, encoding: custom, query: contains variable '%{metadata.db}'", t, func() {
Convey("Given a http flusher with Convert.Protocol: Raw, Convert.Encoding: Custom, Query: '%{metadata.db}'", t, func() {
var actualRequests []string
httpmock.Activate()
defer httpmock.DeactivateAndReset()
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestHttpFlusherExport(t *testing.T) {
})
})

Convey("Export multiple byte events in one GroupEvents with Metadata {db: mydb}", func() {
Convey("Export multiple byte events in one GroupEvents with Metadata {db: mydb}, and ", func() {
groupEvents := models.PipelineGroupEvents{
Group: models.NewGroup(mockMetadata, nil),
Events: []models.PipelineEvent{models.ByteArray(mockMetric1),
Expand All @@ -358,7 +358,89 @@ func TestHttpFlusherExport(t *testing.T) {
So(err, ShouldBeNil)
flusher.Stop()

Convey("events in the same groupEvents should be send in one request", func() {
Convey("events in the same groupEvents should be send in individual request, when Convert.Separator is not set", func() {
reqCount := httpmock.GetTotalCallCount()
So(reqCount, ShouldEqual, 2)
})

Convey("request body should be valid", func() {
So(actualRequests, ShouldResemble, []string{
mockMetric1, mockMetric2,
})
})
})
})

Convey("Given a http flusher with Convert.Protocol: Raw, Convert.Encoding: Custom, Convert.Separator: '\n' ,Query: '%{metadata.db}'", t, func() {
var actualRequests []string
httpmock.Activate()
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("POST", "http://test.com/write?db=mydb", func(req *http.Request) (*http.Response, error) {
body, _ := ioutil.ReadAll(req.Body)
actualRequests = append(actualRequests, string(body))
return httpmock.NewStringResponse(200, "ok"), nil
})

flusher := &FlusherHTTP{
RemoteURL: "http://test.com/write",
Convert: helper.ConvertConfig{
Protocol: converter.ProtocolRaw,
Encoding: converter.EncodingCustom,
Separator: "\n",
},
Timeout: defaultTimeout,
Concurrency: 1,
Query: map[string]string{
"db": "%{metadata.db}",
},
}

err := flusher.Init(mock.NewEmptyContext("p", "l", "c"))
So(err, ShouldBeNil)

mockMetric1 := "cpu.load.short,host=server01,region=cn value=0.6 1672321328000000000"
mockMetric2 := "cpu.load.short,host=server01,region=cn value=0.2 1672321358000000000"
mockMetadata := models.NewMetadataWithKeyValues("db", "mydb")

Convey("Export a single byte events each GroupEvents with Metadata {db: mydb}", func() {
groupEventsArray := []*models.PipelineGroupEvents{
{
Group: models.NewGroup(mockMetadata, nil),
Events: []models.PipelineEvent{models.ByteArray(mockMetric1)},
},
{
Group: models.NewGroup(mockMetadata, nil),
Events: []models.PipelineEvent{models.ByteArray(mockMetric2)},
},
}
httpmock.ZeroCallCounters()
err := flusher.Export(groupEventsArray, nil)
So(err, ShouldBeNil)
flusher.Stop()

Convey("each GroupEvents should send in a single request", func() {
So(httpmock.GetTotalCallCount(), ShouldEqual, 2)
})
Convey("request body should by valid", func() {
So(actualRequests, ShouldResemble, []string{
mockMetric1, mockMetric2,
})
})
})

Convey("Export multiple byte events in one GroupEvents with Metadata {db: mydb}, and ", func() {
groupEvents := models.PipelineGroupEvents{
Group: models.NewGroup(mockMetadata, nil),
Events: []models.PipelineEvent{models.ByteArray(mockMetric1),
models.ByteArray(mockMetric2)},
}
httpmock.ZeroCallCounters()
err := flusher.Export([]*models.PipelineGroupEvents{&groupEvents}, nil)
So(err, ShouldBeNil)
flusher.Stop()

Convey("events in the same groupEvents should be send in one request, when Convert.Separator is set", func() {
reqCount := httpmock.GetTotalCallCount()
So(reqCount, ShouldEqual, 1)
})
Expand All @@ -370,6 +452,7 @@ func TestHttpFlusherExport(t *testing.T) {
})
})
})

}

func TestHttpFlusherExportUnsupportedEventType(t *testing.T) {
Expand Down

0 comments on commit 77b65fb

Please sign in to comment.