Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix metadata aggregator #654

Merged
merged 6 commits into from
Feb 15, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions docs/cn/data-pipeline/aggregator/aggregator-metadata-group.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,35 @@

## 配置参数

| 参数 | 类型 | 是否必选 | 说明 |
|------------------|----------| -------- |--------------------------------------------------------------|
| Type | String | 是 | 插件类型,指定为`aggregator_metadata_group`。 |
| GroupMetadataKeys | []String | 是 | 指定需要按照其值分组的Key列表。 |
| GroupMaxEventLength | int | 否 | 聚合时,单个PipelineGroupEvents中的最大Events数量,默认1024 |
| GroupMaxByteLength | int | 否 | 聚合时,单个PipelineGroupEvents中Events总的字节长度,仅支持ByteArray类型,默认3MiB |
| 参数 | 类型 | 是否必选 | 说明 |
|----------------------|----------|------|--------------------------------------------------------------|
| Type | String | 是 | 插件类型,指定为`aggregator_metadata_group`。 |
| GroupMetadataKeys | []String | 否 | 指定需要按照其值分组的Key列表, 为空是表示按空进行打包聚合。 |
| GroupMaxEventLength | int | 否 | 聚合时,单个PipelineGroupEvents中的最大Events数量,默认1024 |
| GroupMaxByteLength | int | 否 | 聚合时,单个PipelineGroupEvents中Events总的字节长度,仅支持ByteArray类型,默认3MiB |
| OversizeDirectOutput | int | 否 | 遇到单个PipelineGroupEvent字节长度超过上限是否直接输出,默认为否,仅仅本地Error 日志记录 |

## 样例

采集`service_http_server`输入插件获取的字节流,使用`GroupMetadataKeys`指定Metadata中用于聚合的字段,聚合后将采集结果通过http请求发送到指定目的。本样例中需要使用v2版本插件。

在样例中,首先通过input插件的配置`QueryParams`定义Metadata的数据来源,然后才能在aggregator插件中使用Metadata Key进行聚合,相同`db`的Events会被放在同一个PipelineGroupEvents向后传递。在flusher_http中,每个PipelineGroupEvents中的所有Events会在一次请求中输出。
采集`service_http_server`输入插件获取的字节流,使用`GroupMetadataKeys`
指定Metadata中用于聚合的字段,聚合后将采集结果通过http请求发送到指定目的。本样例中需要使用v2版本插件。

在样例中,首先通过input插件的配置`QueryParams`定义Metadata的数据来源,然后才能在aggregator插件中使用Metadata
Key进行聚合,相同`db`的Events会被放在同一个PipelineGroupEvents向后传递。在flusher_http中,每个PipelineGroupEvents中的所有Events会在一次请求中输出。

* 输入

```bash
curl --request POST 'http://127.0.0.1:12345?db=mydb' --data 'test,host=server01,region=cn value=0.1'
```


* 采集配置

```yaml
enable: true
version: "v2"
inputs:
- Type: service_http_server
- Type: service_http_server
Format: raw
Address: "http://127.0.0.1:12345"
QueryParams:
Expand Down
164 changes: 119 additions & 45 deletions plugins/aggregator/metadatagroup/aggregator_metadata_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package metadatagroup

import (
"fmt"
"github.com/alibaba/ilogtail/pkg/logger"
"strings"
"sync"

Expand All @@ -31,44 +32,130 @@ const (
)

type metadataGroup struct {
group *models.GroupInfo
events []models.PipelineEvent
context pipeline.Context
group *models.GroupInfo
events []models.PipelineEvent

nowEventsLength int
nowEventsByteLength int
maxEventsLength int
maxEventsByteLength int
nowEventsLength int
nowEventsByteLength int
maxEventsLength int
maxEventsByteLength int
oversizeDirectOutput bool
yyuuttaaoo marked this conversation as resolved.
Show resolved Hide resolved

lock *sync.Mutex
}

func (g *metadataGroup) Record(group *models.PipelineGroupEvents, ctx pipeline.PipelineContext) error {
g.lock.Lock()
defer g.lock.Unlock()
eventsLen := len(group.Events)
bytesLen := g.evaluateByteLength(group.Events)

// when current events is full, make a quick flush.
if len(g.events) > 0 && (g.nowEventsLength+eventsLen > g.maxEventsLength || g.nowEventsByteLength+bytesLen > g.maxEventsByteLength) {
ctx.Collector().Collect(g.group, g.events...)
// reset length
g.Reset()
// check by bytes size, currently only works at `models.ByteArray`
t := group.Events[0].GetType()
switch t {
case models.EventTypeByteArray:
g.collectByLengthAndBytesChecker(group, ctx)
default:
g.collectByLengthChecker(group, ctx)
}

// add events
g.nowEventsLength += eventsLen
g.nowEventsByteLength += bytesLen
g.events = append(g.events, group.Events...)
return nil
}

func (g *metadataGroup) collectByLengthChecker(group *models.PipelineGroupEvents, ctx pipeline.PipelineContext) {
for {
if len(group.Events) == 0 {
break
}
inputSize := len(group.Events)
availableSize := g.maxEventsLength - g.nowEventsLength
if availableSize < 0 {
logger.Error(g.context.GetRuntimeContext(), "RUNTIME_ALARM", "availableSize is negative")
_ = g.GetResultWithoutLock(ctx)
g.Reset()
EvanLjp marked this conversation as resolved.
Show resolved Hide resolved
continue
}

if availableSize >= inputSize {
g.events = append(g.events, group.Events...)
g.nowEventsLength += inputSize
break
} else {
g.events = append(g.events, group.Events[0:availableSize]...)
_ = g.GetResultWithoutLock(ctx)
group.Events = group.Events[availableSize:]
}
}

}

func (g *metadataGroup) collectByLengthAndBytesChecker(group *models.PipelineGroupEvents, ctx pipeline.PipelineContext) {
for {
if len(group.Events) == 0 {
break
}
availableBytesSize := g.maxEventsByteLength - g.nowEventsByteLength
availableLenSize := g.maxEventsLength - g.nowEventsLength
if availableLenSize < 0 || availableBytesSize < 0 {
logger.Error(g.context.GetRuntimeContext(), "RUNTIME_ALARM", "availableSize or availableLength is negative")
_ = g.GetResultWithoutLock(ctx)
g.Reset()
yyuuttaaoo marked this conversation as resolved.
Show resolved Hide resolved
continue
}

bytes := 0
num := 0
oversize := false
for _, event := range group.Events {
byteArray, ok := event.(models.ByteArray)
if !ok {
continue
}
if bytes+len(byteArray) > availableBytesSize || num+1 > availableLenSize {
break
}
bytes += len(byteArray)
num++
}
if len(g.events) == 0 && num == 0 {
// means the first event oversize the maximum group size
if g.oversizeDirectOutput {
num = 1
oversize = true
} else {
logger.Errorf(g.context.GetRuntimeContext(), "AGGREGATE_OVERSIZE_ALARM", "the first event[%s] size is over the limit size %d, the event would be dropped",
yyuuttaaoo marked this conversation as resolved.
Show resolved Hide resolved
group.Events[0].GetName(),
g.maxEventsByteLength)
group.Events = group.Events[1:]
continue
}

}

if num >= len(group.Events) {
g.events = append(g.events, group.Events...)
g.nowEventsByteLength += bytes
g.nowEventsLength += num
if oversize {
_ = g.GetResultWithoutLock(ctx)
}
break
} else {
g.events = append(g.events, group.Events[0:num]...)
_ = g.GetResultWithoutLock(ctx)
group.Events = group.Events[num:]
}
}

}

func (g *metadataGroup) GetResult(ctx pipeline.PipelineContext) error {
g.lock.Lock()
defer g.lock.Unlock()
return g.GetResultWithoutLock(ctx)
}

func (g *metadataGroup) GetResultWithoutLock(ctx pipeline.PipelineContext) error {
if len(g.events) == 0 {
return nil
}

// reset
ctx.Collector().Collect(g.group, g.events...)
g.Reset()
Expand All @@ -81,33 +168,18 @@ func (g *metadataGroup) Reset() {
g.events = make([]models.PipelineEvent, 0, g.maxEventsLength)
}

func (g *metadataGroup) evaluateByteLength(events []models.PipelineEvent) int {
if len(events) == 0 {
return 0
}
length := 0
for _, event := range events {
byteArray, ok := event.(models.ByteArray)
if !ok {
continue
}
length += len(byteArray)
}
return length
}

type AggregatorMetadataGroup struct {
GroupMetadataKeys []string `json:"GroupMetadataKeys,omitempty" comment:"group by metadata keys"`
GroupMaxEventLength int `json:"GroupMaxEventLength,omitempty" comment:"max count of events in a pipelineGroupEvents"`
GroupMaxByteLength int `json:"GroupMaxByteLength,omitempty" comment:"max sum of byte length of events in a pipelineGroupEvents"`
context pipeline.Context
GroupMetadataKeys []string `json:"GroupMetadataKeys,omitempty" comment:"group by metadata keys"`
GroupMaxEventLength int `json:"GroupMaxEventLength,omitempty" comment:"max count of events in a pipelineGroupEvents"`
GroupMaxByteLength int `json:"GroupMaxByteLength,omitempty" comment:"max sum of byte length of events in a pipelineGroupEvents"`
OversizeDirectOutput bool `json:"OversizeDirectOutput,omitempty" comment:"direct output when the input event over GroupMaxByteLength"`

groupAgg sync.Map
}

func (g *AggregatorMetadataGroup) Init(context pipeline.Context, que pipeline.LogGroupQueue) (int, error) {
if len(g.GroupMetadataKeys) == 0 {
return 0, fmt.Errorf("must specify GroupMetadataKeys")
}
g.context = context
if g.GroupMaxEventLength <= 0 {
return 0, fmt.Errorf("unknown GroupMaxEventLength")
}
Expand Down Expand Up @@ -146,10 +218,12 @@ func (g *AggregatorMetadataGroup) getOrCreateMetadataGroup(event *models.Pipelin
group, ok := g.groupAgg.Load(aggKey)
if !ok {
newGroup := &metadataGroup{
group: &models.GroupInfo{Metadata: metadata},
maxEventsLength: g.GroupMaxEventLength,
maxEventsByteLength: g.GroupMaxByteLength,
lock: &sync.Mutex{},
group: &models.GroupInfo{Metadata: metadata},
maxEventsLength: g.GroupMaxEventLength,
maxEventsByteLength: g.GroupMaxByteLength,
oversizeDirectOutput: g.OversizeDirectOutput,
lock: &sync.Mutex{},
context: g.context,
}
group, _ = g.groupAgg.LoadOrStore(aggKey, newGroup)
}
Expand Down
97 changes: 94 additions & 3 deletions plugins/aggregator/metadatagroup/aggregator_metadata_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"testing"

. "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/require"

"github.com/alibaba/ilogtail/pkg/models"
"github.com/alibaba/ilogtail/pkg/pipeline"
"github.com/alibaba/ilogtail/plugins/test/mock"
)

const (
Expand All @@ -32,7 +34,7 @@ func TestInitMetadataGroupAggregator(t *testing.T) {
agg := NewAggregatorMetadataGroup()
Convey("When Init(), should return error", func() {
_, err := agg.Init(nil, nil)
So(err, ShouldNotBeNil)
So(err, ShouldBeNil)
})
})

Expand Down Expand Up @@ -215,9 +217,9 @@ func TestMetadataGroupAggregatorRecordWithNonexistentKey(t *testing.T) {
func generateByteArrayEvents(count int, meta map[string]string) []*models.PipelineGroupEvents {
groupEventsArray := make([]*models.PipelineGroupEvents, 0, count)
event := models.ByteArray(RawData)
groupEvents := &models.PipelineGroupEvents{Group: models.NewGroup(models.NewMetadataWithMap(meta), nil),
Events: []models.PipelineEvent{event}}
for i := 0; i < count; i++ {
groupEvents := &models.PipelineGroupEvents{Group: models.NewGroup(models.NewMetadataWithMap(meta), nil),
Events: []models.PipelineEvent{event}}
groupEventsArray = append(groupEventsArray, groupEvents)
}
return groupEventsArray
Expand Down Expand Up @@ -260,3 +262,92 @@ func TestMetadataGroupAggregatorGetResult(t *testing.T) {

})
}

func TestMetadataGroupGroup_Record_Directly(t *testing.T) {
p := new(AggregatorMetadataGroup)
p.GroupMaxEventLength = 5
ctx := pipeline.NewObservePipelineConext(100)

err := p.Record(constructEvents(104, map[string]string{}, map[string]string{}), ctx)
require.NoError(t, err)
require.Equal(t, 20, len(ctx.Collector().ToArray()))
}

func TestMetadataGroupGroup_Record_Tag(t *testing.T) {
p := new(AggregatorMetadataGroup)
p.GroupMaxEventLength = 5
p.GroupMetadataKeys = []string{"meta"}
events := constructEvents(104, map[string]string{
"meta": "metaval",
}, map[string]string{
"tag": "tagval",
})
ctx := pipeline.NewObservePipelineConext(100)
p.Init(mock.NewEmptyContext("a", "b", "c"), nil)
err := p.Record(events, ctx)
require.NoError(t, err)
array := ctx.Collector().ToArray()
require.Equal(t, 20, len(array))
require.Equal(t, 1, len(array[0].Group.Metadata.Iterator()))
require.Equal(t, "metaval", array[0].Group.Metadata.Get("meta"))
}

func TestMetadataGroupGroup_Record_Timer(t *testing.T) {
p := new(AggregatorMetadataGroup)
p.GroupMaxEventLength = 500
ctx := pipeline.NewObservePipelineConext(100)
p.Init(mock.NewEmptyContext("a", "b", "c"), nil)
err := p.Record(constructEvents(104, map[string]string{}, map[string]string{}), ctx)
require.NoError(t, err)
require.Equal(t, 0, len(ctx.Collector().ToArray()))
_ = p.GetResult(ctx)
array := ctx.Collector().ToArray()
require.Equal(t, 1, len(array))
require.Equal(t, 104, len(array[0].Events))
}

func TestMetadataGroupGroup_Oversize(t *testing.T) {
{
p := new(AggregatorMetadataGroup)
p.GroupMaxEventLength = 500
p.GroupMaxByteLength = 1
p.OversizeDirectOutput = false
ctx := pipeline.NewObservePipelineConext(100)
p.Init(mock.NewEmptyContext("a", "b", "c"), nil)

events := generateByteArrayEvents(5, map[string]string{"a": "1", "b": "2", "c": "3"})
for _, event := range events {
require.NoError(t, p.Record(event, ctx))
}
require.Equal(t, 0, len(ctx.Collector().ToArray()))
_ = p.GetResult(ctx)
require.Equal(t, 0, len(ctx.Collector().ToArray()))
}
{
p := new(AggregatorMetadataGroup)
p.GroupMaxEventLength = 500
p.GroupMaxByteLength = 1
p.OversizeDirectOutput = true
ctx := pipeline.NewObservePipelineConext(100)
p.Init(mock.NewEmptyContext("a", "b", "c"), nil)

events := generateByteArrayEvents(5, map[string]string{"a": "1", "b": "2", "c": "3"})
for _, event := range events {
require.NoError(t, p.Record(event, ctx))
}
require.Equal(t, 5, len(ctx.Collector().ToArray()))
_ = p.GetResult(ctx)
require.Equal(t, 0, len(ctx.Collector().ToArray()))
}

}

func constructEvents(num int, meta, tags map[string]string) *models.PipelineGroupEvents {
group := models.NewGroup(models.NewMetadataWithMap(meta), models.NewTagsWithMap(tags))
e := new(models.PipelineGroupEvents)
e.Group = group
for i := 0; i < num; i++ {
e.Events = append(e.Events, &models.Metric{})
}
return e
}