Skip to content

Commit

Permalink
Processor regex replace (#757)
Browse files Browse the repository at this point in the history
* add processor_fields_with_condition to support process actions after switch-case conditions … (#139)

* add processor_filter_compose_regex to support add fields after regex filter
  • Loading branch information
pj1987111 authored Apr 5, 2023
1 parent 5518b55 commit 6340731
Show file tree
Hide file tree
Showing 8 changed files with 823 additions and 48 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@ your changes, such as:
- [public] [both] [added] add new plugin type: extension
- [public] [both] [updated] http flusher support custom authenticator, filter and request circuit-breaker via the extension plugin mechanism
- [public] [both] [added] add new plugin: flusher_loki
- [public] [both] [added] add new plugin: processor_string_replace
1 change: 1 addition & 0 deletions docs/cn/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
* [键值对](data-pipeline/processor/processor-split-key-value.md)
* [多行切分](data-pipeline/processor/processor-split-log-regex.md)
* [云平台元数据](data-pipeline/processor/processor-cloudmeta.md)
* [字符串替换](data-pipeline/processor/processor-string-replace.md)
* [聚合](data-pipeline/aggregator/README.md)
* [基础](data-pipeline/aggregator/aggregator-base.md)
* [上下文](data-pipeline/aggregator/aggregator-context.md)
Expand Down
97 changes: 49 additions & 48 deletions docs/cn/data-pipeline/overview.md

Large diffs are not rendered by default.

201 changes: 201 additions & 0 deletions docs/cn/data-pipeline/processor/processor-string-replace.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
# 正则

## 简介

`processor_string_replace processor`插件可以通过全文、正则匹配、去转义的方式实现文本日志的替换。

## 版本

[Stable](../stability-level.md)

## 配置参数

| 参数 | 类型 | 是否必选 | 说明 |
| ------------ | -------- | ---- | ------------------------------------------------------------------------- |
| Type | String || 插件类型 |
| SourceKey | String || 匹配字段名 |
| Method | String || 无默认值。匹配方式,可选值如下:<br>const:字符串全文替换。<br>regex:使用正则提取替换。<br>unquote:去除转义符。 |
| Match | String || 无默认值。匹配指定数据。<br>const:输入需要匹配的字符串。当多个子串符合匹配条件时全部替换。<br>regex:输入需要匹配的正则表达式。当多个子串符合匹配条件时全部替换,也可以用正则分组的方式匹配指定分组。<br>unquote:去除转义符不需要输入。 |
| ReplaceString | String || 默认值""。替换数据。<br>const:为匹配后替换的字符串。<br>regex:为匹配后替换的字符串,支持分组替换。<br>unquote:去除转义符不需要输入。 |
| DestKey | String || 无默认值。字符串替换后的值存储的新字段,默认不存储新字段。 |

## 样例

### 示例 1:全文匹配与替换

采集`/home/test-log/`路径下的`string_replace.log`文件,测试日志内容的正则匹配与替换功能。

* 输入

```bash
echo 'hello,how old are you? nice to meet you' >> /home/test-log/string_replace.log
```

* 采集配置

```yaml
enable: true
inputs:
- Type: file_log
LogPath: /home/test-log/
FilePattern: string_replace.log
processors:
- Type: processor_string_replace
SourceKey: content
Method: const
Match: 'how old are you?'
ReplaceString: ''
flushers:
- Type: flusher_sls
Endpoint: cn-xxx.log.aliyuncs.com
ProjectName: test_project
LogstoreName: test_logstore
- Type: flusher_stdout
OnlyStdout: true
```
* 输出
```json
{
"__tag__:__path__": "/home/test_log/string_replace.log",
"content": "hello, nice to meet you",
"__time__": "1680353730"
}
```

### 示例 2:基本正则匹配与替换

采集`/home/test-log/`路径下的`string_replace.log`文件,测试日志内容的正则匹配与替换功能。

* 输入

```bash
echo '2022-09-16 09:03:31.013 \u001b[32mINFO \u001b[0;39m \u001b[34m[TID: N/A]\u001b[0;39m [\u001b[35mThread-30\u001b[0;39m] \u001b[36mc.s.govern.polygonsync.job.BlockTask\u001b[0;39m : 区块采集------结束------\r' >> /home/test-log/string_replace.log
```

* 采集配置

```yaml
enable: true
inputs:
- Type: file_log
LogPath: /home/test-log/
FilePattern: string_replace.log
processors:
- Type: processor_string_replace
SourceKey: content
Method: regex
Match: \\u\w+\[\d{1,3};*\d{1,3}m|N/A
ReplaceString: ''
flushers:
- Type: flusher_sls
Endpoint: cn-xxx.log.aliyuncs.com
ProjectName: test_project
LogstoreName: test_logstore
- Type: flusher_stdout
OnlyStdout: true
```
* 输出
```json
{
"__tag__:__path__": "/home/test_log/string_replace.log",
"content": "2022-09-16 09:03:31.013 INFO [TID: ] [Thread-30] c.s.govern.polygonsync.job.BlockTask : 区块采集------结束------\r",
"__time__": "1680353730"
}
```

### 示例 3:根据正则分组匹配与替换并输出到新的字段

采集`/home/test-log/`路径下的`string_replace.log`文件,测试日志内容的正则分组匹配与替换功能。
注:分组替换ReplaceString中不能存在{},选择分组只能使用$1、$2 这种方式。

* 输入

```bash
echo '10.10.239.16' >> /home/test-log/string_replace.log
```

* 采集配置

```yaml
enable: true
inputs:
- Type: file_log
LogPath: /home/test-log/
FilePattern: string_replace.log
processors:
- Type: processor_string_replace
SourceKey: content
Method: regex
Match: (\d.*\.)\d+
ReplaceString: $1*/24
DestKey: new_ip
flushers:
- Type: flusher_sls
Endpoint: cn-xxx.log.aliyuncs.com
ProjectName: test_project
LogstoreName: test_logstore
- Type: flusher_stdout
OnlyStdout: true
```
* 输出
```json
{
"__tag__:__path__": "/home/test_log/string_replace.log",
"content": "10.10.239.16",
"new_ip": "10.10.239.*/24",
"__time__": "1680353730"
}
```

### 示例 4:替换转义字符

采集`/home/test-log/`路径下的`string_replace.log`文件,测试转义自付替换功能。

* 输入

```bash
echo '{\\x22UNAME\\x22:\\x22\\x22,\\x22GID\\x22:\\x22\\x22,\\x22PAID\\x22:\\x22\\x22,\\x22UUID\\x22:\\x22\\x22,\\x22STARTTIME\\x22:\\x22\\x22,\\x22ENDTIME\\x22:\\x22\\x22,\\x22UID\\x22:\\x222154212790\\x22,\\x22page_num\\x22:1,\\x22page_size\\x22:10}' >> /home/test-log/string_replace.log
echo '\\u554a\\u554a\\u554a' >> /home/test-log/string_replace.log
```

* 采集配置

```yaml
enable: true
inputs:
- Type: file_log
LogPath: /home/test-log/
FilePattern: string_replace.log
processors:
- Type: processor_string_replace
SourceKey: content
Method: unquote
flushers:
- Type: flusher_sls
Endpoint: cn-xxx.log.aliyuncs.com
ProjectName: test_project
LogstoreName: test_logstore
- Type: flusher_stdout
OnlyStdout: true
```
* 输出
```json
{
"__tag__:__path__": "/home/test_log/string_replace.log",
"content": "{\"UNAME\":\"\",\"GID\":\"\",\"PAID\":\"\",\"UUID\":\"\",\"STARTTIME\":\"\",\"ENDTIME\":\"\",\"UID\":\"2154212790\",\"page_num\":1,\"page_size\":10}",
"__time__": "1680353730"
}
{
"__tag__:__path__": "/home/test_log/string_replace.log",
"content": "啊啊啊",
"__time__": "1680353730"
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,9 @@ Processor 的开发分为以下步骤:
5. 进行单测或者E2E测试,请参考[如何使用单测](../test/unit-test.md)[如何使用E2E测试](../test/e2e-test.md).
6. 使用 *make lint* 检查代码规范。
7. 提交Pull Request。

## Processor 准入性能规范

*基础case参考*:512随机字符作为内容,完整执行一次processor中的逻辑。

*处理速度准入条件*:4w/s。
1 change: 1 addition & 0 deletions plugins.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ plugins:
- import: "github.com/alibaba/ilogtail/plugins/processor/split/logstring"
- import: "github.com/alibaba/ilogtail/plugins/processor/split/string"
- import: "github.com/alibaba/ilogtail/plugins/processor/strptime"
- import: "github.com/alibaba/ilogtail/plugins/processor/stringreplace"
linux:
- import: "github.com/alibaba/ilogtail/plugins/flusher/sls"
- import: "github.com/alibaba/ilogtail/plugins/input/docker/logmeta"
Expand Down
131 changes: 131 additions & 0 deletions plugins/processor/stringreplace/processor_string_replace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2023 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package stringreplace

import (
"errors"
"strconv"
"strings"

"github.com/dlclark/regexp2"

"github.com/alibaba/ilogtail/pkg/helper"
"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/pkg/pipeline"
"github.com/alibaba/ilogtail/pkg/protocol"
)

type ProcessorStringReplace struct {
SourceKey string
Method string
Match string
ReplaceString string
DestKey string

re *regexp2.Regexp
context pipeline.Context
logPairMetric pipeline.CounterMetric
}

const (
PluginName = "processor_string_replace"

MethodRegex = "regex"
MethodConst = "const"
MethodUnquote = "unquote"
)

var errNoMethod = errors.New("no method error")
var errNoMatch = errors.New("no match error")
var errNoSourceKey = errors.New("no source key error")

// Init called for init some system resources, like socket, mutex...
func (p *ProcessorStringReplace) Init(context pipeline.Context) error {
p.context = context
if len(p.SourceKey) == 0 {
return errNoSourceKey
}
var err error
switch p.Method {
case MethodConst:
if len(p.Match) == 0 {
return errNoMatch
}
case MethodRegex:
p.re, err = regexp2.Compile(p.Match, regexp2.RE2)
if err != nil {
logger.Error(p.context.GetRuntimeContext(), "PROCESSOR_INIT_ALARM", "init regex error", err, "regex", p.Match)
return err
}
case MethodUnquote:
default:
return errNoMethod
}

p.logPairMetric = helper.NewAverageMetric("regex_replace_pairs_per_log")
p.context.RegisterCounterMetric(p.logPairMetric)
return nil
}

func (*ProcessorStringReplace) Description() string {
return "regex replace processor for logtail"
}

func (p *ProcessorStringReplace) ProcessLogs(logArray []*protocol.Log) []*protocol.Log {
replaceCount := 0
for _, log := range logArray {
for _, cont := range log.Contents {
if p.SourceKey != cont.Key {
continue
}
var newContVal string
var err error
switch p.Method {
case MethodConst:
newContVal = strings.ReplaceAll(cont.Value, p.Match, p.ReplaceString)
case MethodRegex:
if ok, _ := p.re.MatchString(cont.Value); ok {
newContVal, err = p.re.Replace(cont.Value, p.ReplaceString, -1, -1)
}
case MethodUnquote:
if strings.HasPrefix(cont.Value, "\"") && strings.HasSuffix(cont.Value, "\"") {
newContVal, err = strconv.Unquote(cont.Value)
} else {
newContVal, err = strconv.Unquote("\"" + strings.ReplaceAll(cont.Value, "\"", "\\x22") + "\"")
}
default:
newContVal = cont.Value
}
if err != nil {
logger.Error(p.context.GetRuntimeContext(), "PROCESSOR_INIT_ALARM", "process log error", err)
newContVal = cont.Value
}
if len(p.DestKey) > 0 {
log.Contents = append(log.Contents, &protocol.Log_Content{Key: p.DestKey, Value: newContVal})
} else {
cont.Value = newContVal
}
replaceCount++
}
}
p.logPairMetric.Add(int64(replaceCount))
return logArray
}

func init() {
pipeline.Processors[PluginName] = func() pipeline.Processor {
return &ProcessorStringReplace{}
}
}
Loading

0 comments on commit 6340731

Please sign in to comment.