Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support elasticsearch dynamic index expression formatting #717

Merged
merged 5 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions docs/cn/developer-guide/format-string/format-index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@

# Elasticsearch动态索引格式化

开发插件往`Elasticsearch`写入数据时。用户往往存在一些动态索引的需求,动态索引和kafka的动态topic稍微有些区别。
动态索引常见的使用场景如下:
- 按照业务字段标识动态创建索引,和`Kafka`的动态`topic`一样。
- 基于时间的索引创建,按天、按周、按月等。在`ELK`社区例常见的案例:`%{+yyyy.MM.dd}`基于天创建,`%{+yyyy.ww}`基于周创建

结合两种需求,动态索引的创建规则如下:

- `%{content.fieldname}`。`content`代表从`contents`中取指定字段值
- `%{tag.fieldname}`,`tag`表示从`tags`中取指定字段值,例如:`%{tag.k8s.namespace.name}`
- `${env_name}`, 使用`$`符直接从环境变量中获取,从`1.5.0`版本支持
- `%{+yyyy.MM.dd}`,按天创建,注意表达式前面时间格式前面的`+`符号不能缺少。
- `%{+yyyy.ww}`,按周创建,还有其他时间格式就不一一举例。由于`golang`时间格式看起来不是非常人性化, 表达式使用上直接参考`ELK`社区的格式。

格式化动态索引的函数如下:
```go
func FormatIndex(targetValues map[string]string, indexPattern string, indexTimestamp uint32) (*string, error)
```
- `targetValues`使用Convert协议转换处理后需要替换的键值对。可参考`flusher_kafka_v2`中的对`ToByteStreamWithSelectedFields`的使用。
- `indexPattern`动态索引表达式。
- `indexTimestamp`和`Log`这个pb中的time字段类型一致方便可以使用日志中的时间来格式化动态索引。
41 changes: 41 additions & 0 deletions pkg/fmtstr/format_index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2023 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fmtstr

import "time"

// FormatIndex name Elasticsearch index base on time like Logstash and Beats
func FormatIndex(targetValues map[string]string, indexPattern string, indexTimestamp uint32) (*string, error) {
shalousun marked this conversation as resolved.
Show resolved Hide resolved
sf, err := Compile(indexPattern, func(key string, ops []VariableOp) (FormatEvaler, error) {
// with timestamp expression, like %{+yyyyMM}
if key[0] == '+' {
indexTime := time.Unix(int64(indexTimestamp), 0)
index := FormatTimestamp(&indexTime, key[1:])
return StringElement{S: index}, nil
}
if value, ok := targetValues[key]; ok {
return StringElement{S: value}, nil
}
return StringElement{S: key}, nil
})
if err != nil {
return nil, err
}
topic, err := sf.Run(nil)
if err != nil {
return nil, err
}
return &topic, nil
}
41 changes: 41 additions & 0 deletions pkg/fmtstr/format_index_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2023 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fmtstr

import (
"fmt"
"testing"
"time"

"github.com/smartystreets/goconvey/convey"
)

func TestFormatIndex(t *testing.T) {
convey.Convey("elasticsearch index format ", t, func() {
nowTime := time.Now().Local()
targetValues := map[string]string{
"app": "ilogtail",
}
elasticsearchIndex := "test_%{app}_%{+yyyy.ww}" // Generate index weekly
actualIndex, _ := FormatIndex(targetValues, elasticsearchIndex, uint32(nowTime.Unix()))
desiredIndex := fmt.Sprintf("test_ilogtail_%s.%d", nowTime.Format("2006"), GetWeek(&nowTime))
convey.So(*actualIndex, convey.ShouldEqual, desiredIndex)

elasticsearchIndexDaily := "test_%{app}_%{+yyyyMMdd}" // Generate index daily
actualIndex, _ = FormatIndex(targetValues, elasticsearchIndexDaily, uint32(nowTime.Unix()))
desiredIndex = fmt.Sprintf("test_ilogtail_%s", nowTime.Format("20060102"))
convey.So(*actualIndex, convey.ShouldEqual, desiredIndex)
})
}
102 changes: 102 additions & 0 deletions pkg/fmtstr/format_timestamp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2023 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fmtstr

import (
"sort"
"strconv"
"strings"
"time"
)

func newTimestampFormatMapping(goFormat, generalFormat string) *TimestampFormatMapping {
return &TimestampFormatMapping{
GoFormat: goFormat,
GeneralFormat: generalFormat,
}
}

type TimestampFormatMapping struct {
GoFormat string
GeneralFormat string
}

func newTimestampFormatMappings() []*TimestampFormatMapping {
return []*TimestampFormatMapping{
newTimestampFormatMapping("January", "MMMM"),
newTimestampFormatMapping("Jan", "MMM"),
newTimestampFormatMapping("1", "M"),
newTimestampFormatMapping("01", "MM"),
newTimestampFormatMapping("Monday", "EEEE"),
newTimestampFormatMapping("Mon", "EEE"),
newTimestampFormatMapping("2", "d"),
newTimestampFormatMapping("02", "dd"),
newTimestampFormatMapping("15", "HH"),
newTimestampFormatMapping("3", "K"),
newTimestampFormatMapping("03", "KK"),
newTimestampFormatMapping("4", "m"),
newTimestampFormatMapping("04", "mm"),
newTimestampFormatMapping("5", "s"),
newTimestampFormatMapping("05", "ss"),
newTimestampFormatMapping("2006", "yyyy"),
newTimestampFormatMapping("06", "yy"),
newTimestampFormatMapping("PM", "aa"),
newTimestampFormatMapping("MST", "Z"),
newTimestampFormatMapping("Z0700", "'Z'XX"),
newTimestampFormatMapping("Z070000", "'Z'XX"),
newTimestampFormatMapping("Z07", "'Z'X"),
newTimestampFormatMapping("Z07:00", "'Z'XXX"),
newTimestampFormatMapping("Z07:00:00", "'Z'XXX"),
newTimestampFormatMapping("-0700", "XX"),
newTimestampFormatMapping("-07", "X"),
newTimestampFormatMapping("-07:00", "XXX"),
newTimestampFormatMapping("999999999", "SSS"),
}
}

func FormatTimestamp(t *time.Time, format string) string {
mappings := newTimestampFormatMappings()
goFormat := GeneralToGoFormat(mappings, format)
// format with week
if strings.Contains(goFormat, "ww") {
weekNumber := GetWeek(t)
formattedTime := t.Format(goFormat)
return strings.ReplaceAll(formattedTime, "ww", strconv.Itoa(weekNumber))
}
return t.Format(goFormat)
}

func GetWeek(t *time.Time) int {
_, week := t.ISOWeek()
return week
}

func GeneralToGoFormat(mappings []*TimestampFormatMapping, format string) string {
// After sorting, avoid replacement errors when using replace later
sort.Slice(mappings, func(i, j int) bool {
return len(mappings[i].GeneralFormat) > len(mappings[j].GeneralFormat)
})
for _, m := range mappings {
if m.GeneralFormat == "M" && (strings.Contains(format, "Mon") ||
strings.Contains(format, "AM") || strings.Contains(format, "PM")) {
continue
}
if m.GeneralFormat == "d" && strings.Contains(format, "Monday") {
continue
}
format = strings.ReplaceAll(format, m.GeneralFormat, m.GoFormat)
}
return format
}
34 changes: 34 additions & 0 deletions pkg/fmtstr/format_timestamp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2022 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fmtstr

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestFormatTime(t *testing.T) {
someTime := time.Now().Local()
currentStrTime := FormatTimestamp(&someTime, "yyyyMMdd")
expectTime := time.Now().Format("20060102")
assert.Equal(t, expectTime, currentStrTime)

currentWeekNumber := FormatTimestamp(&someTime, "yyyy.ww")
expectWeekNumber := fmt.Sprintf("%s.%d", someTime.Format("2006"), GetWeek(&someTime))
assert.Equal(t, expectWeekNumber, currentWeekNumber)
}