Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: tag fields rename #746

Merged
merged 5 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,7 @@ your changes, such as:

## [Unreleased]

- [public] [both] [updated] Enable enable_env_ref_in_config configuration to support system variable binding
- [public] [both] [fixed] When using the TagFieldsRename configuration in flusher_kafka_v2/flusher_pulsar, some fields in tags cannot be renamed
- [public] [both] [added] add new plugin type: extension
- [public] [both] [updated] http flusher support custom authenticator, filter and request circuit-breaker via the extension plugin mechanism
- [public] [both] [updated] http flusher support custom authenticator, filter and request circuit-breaker via the extension plugin mechanism
2 changes: 2 additions & 0 deletions core/logtail.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ DECLARE_FLAG_STRING(check_point_filename);
DECLARE_FLAG_STRING(default_buffer_file_path);
DECLARE_FLAG_STRING(ilogtail_docker_file_path_config);
DECLARE_FLAG_INT32(data_server_port);
DECLARE_FLAG_BOOL(enable_env_ref_in_config);
shalousun marked this conversation as resolved.
Show resolved Hide resolved

void HandleSighupSignal(int signum, siginfo_t* info, void* context) {
ConfigManager::GetInstance()->SetMappingPathsChanged();
Expand Down Expand Up @@ -103,6 +104,7 @@ static void overwrite_community_edition_flags() {
STRING_FLAG(default_buffer_file_path) = "checkpoint";
STRING_FLAG(ilogtail_docker_file_path_config) = "checkpoint/docker_path_config.json";
INT32_FLAG(data_server_port) = 443;
BOOL_FLAG(enable_env_ref_in_config) = true;
}

// Main routine of worker process.
Expand Down
2 changes: 2 additions & 0 deletions core/logtail_windows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ DECLARE_FLAG_STRING(check_point_filename);
DECLARE_FLAG_STRING(default_buffer_file_path);
DECLARE_FLAG_STRING(ilogtail_docker_file_path_config);
DECLARE_FLAG_INT32(data_server_port);
DECLARE_FLAG_BOOL(enable_env_ref_in_config);

static void overwrite_community_edition_flags() {
// support run in installation dir on default
Expand All @@ -60,6 +61,7 @@ static void overwrite_community_edition_flags() {
STRING_FLAG(default_buffer_file_path) = "checkpoint";
STRING_FLAG(ilogtail_docker_file_path_config) = "checkpoint/docker_path_config.json";
INT32_FLAG(data_server_port) = 443;
BOOL_FLAG(enable_env_ref_in_config) = true;
}

void do_worker_process() {
Expand Down
72 changes: 69 additions & 3 deletions docs/cn/data-pipeline/flusher/flusher-kafka_v2.md
henryzhx8 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -145,21 +145,87 @@ flushers:

### 动态topic

针对上面写入的这种日志格式,如果想根据`application`名称针对不用的应用推送到不通的`topic`,
针对上面写入的这种日志格式,如果想根据`application`名称针对不用的应用推送到不同的`topic`,
则`topic`可以这样配置。

```yaml
Topic: test_%{content.application}
```

最后`ilogtail`就自动将日志推送到`test_springboot-docker`这个`topic`中。

`topic`动态表达式规则:

- `%{content.fieldname}`。`content`代表从`contents`中取指定字段值
- `%{tag.fieldname}`,`tag`表示从`tags`中取指定字段值,例如:`%{tag.k8s.namespace.name}`
- `${env_name}`, 读取系统变量绑定到动态`topic`上,`ilogtail 1.5.0`开始支持。
- 其它方式暂不支持

#### 动态topic中使用系统变量
shalousun marked this conversation as resolved.
Show resolved Hide resolved

动态`topic`绑定系统变量的两种场景:
- 将系统变量采集添加到日志的`tag`中,然后使用`%{tag.fieldname}`规则完成绑定。
- 对系统变量无采集存储需求,只是想根据设定的系统变量将日志推送到指定的`topic`中,直接采用`${env_name}`规则完成绑定,此方式需要`1.5.0`才支持。

由于上面提到的两种系统变量的采集绑定都需要做一些特殊配置,因此下面将分别介绍下相关的配置操作。


**(1)将系统变量采集到日志中完成动态`topic`绑定**

将系统变量采集添加到日志中有两种方式,一种是在`ilogtail`容器`env`添加,另一种是通过`processor_add_fields` 插件添加,
两种方式不同的配置参考下面的介绍

- 在`daemonset`或者`sidecar`方式部署的`ilogtail`容器`env`配置部分添加自定义的系统变量,配置参考案例如下:

```yaml
env:
- name: ALIYUN_LOG_ENV_TAGS # add log tags from env
value: _node_name_|_node_ip_|_app_name_
- name: _app_name_ # 添加自定义_app_name_变量,
value: kafka
```

自定义的变量`_app_name_`被添加到`ALIYUN_LOG_ENV_TAGS`中,日志的`tags`中会看到自定义的变量, 此时动态 `topic`采用`%{tag.fieldname}`规则配置即可。

- 使用`processor_add_fields` 插件系统变量添加到日志中,配置参考如下:

```yaml
processors:
- Type: processor_add_fields
Fields:
service: ${env_name}
IgnoreIfExist: false
```

这里`${env_name}`生效依赖于`ilogtail`的`enable_env_ref_in_config`配置,从`ilogtail 1.5.0`开始支持。

**(2)直接采用`$`符将系统变量绑定动态`topic`中**

在`daemonset`或者`sidecar`方式部署的`ilogtail`容器`env`配置部分添加自定义的系统变量,配置参考案例如下:
```yaml
env:
- name: ALIYUN_LOG_ENV_TAGS # add log tags from env
value: _node_name_|_node_ip_
- name: app_name # 添加自定义app_name变量,
value: kafka
```
`app_name`添加到系统变量中后,直接采用动态topic的:`${env_name}`规则即可绑定。

```yaml
enable: true
inputs:
- Type: file_log
LogPath: /home/test_log
FilePattern: "*.log"
flushers:
- Type: flusher_kafka
Brokers:
- 192.XX.XX.1:9092
- 192.XX.XX.2:9092
- 192.XX.XX.3:9092
Topic: ilogtail_${app_name}
shalousun marked this conversation as resolved.
Show resolved Hide resolved
```
- `${app_name}`就是我们上面添加的系统变量。


### TagFieldsRename

例如将`tags`中的`host.name`重命名为`hostname`,配置参考如下:
Expand Down
26 changes: 13 additions & 13 deletions pkg/protocol/converter/converter.go
shalousun marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func convertLogToMap(log *protocol.Log, logTags []*protocol.LogTag, src, topic s
for _, logContent := range log.Contents {
switch logContent.Key {
case "__log_topic__":
tags[tagLogTopic] = logContent.Value
addTagIfRequired(tags, tagKeyRenameMap, tagLogTopic, logContent.Value)
case tagPrefix + "__user_defined_id__":
continue
default:
Expand All @@ -222,11 +222,7 @@ func convertLogToMap(log *protocol.Log, logTags []*protocol.LogTag, src, topic s
}
}
if len(tagName) != 0 {
if newTagName, ok := tagKeyRenameMap[tagName]; ok && len(newTagName) != 0 {
tags[newTagName] = logContent.Value
} else if !ok {
tags[tagName] = logContent.Value
}
addTagIfRequired(tags, tagKeyRenameMap, tagName, logContent.Value)
} else {
contents[logContent.Key] = logContent.Value
}
Expand All @@ -244,16 +240,12 @@ func convertLogToMap(log *protocol.Log, logTags []*protocol.LogTag, src, topic s
} else if _, ok := tagConversionMap[logTag.Key]; ok {
tagName = tagConversionMap[logTag.Key]
}
if newTagName, ok := tagKeyRenameMap[tagName]; ok && len(newTagName) != 0 {
tags[newTagName] = logTag.Value
} else if !ok {
tags[tagName] = logTag.Value
}
addTagIfRequired(tags, tagKeyRenameMap, tagName, logTag.Value)
}

tags[tagHostIP] = src
addTagIfRequired(tags, tagKeyRenameMap, tagHostIP, src)
if topic != "" {
tags[tagLogTopic] = topic
addTagIfRequired(tags, tagKeyRenameMap, tagLogTopic, topic)
}

return contents, tags
Expand Down Expand Up @@ -283,3 +275,11 @@ func findTargetValues(targetFields []string, contents, tags, tagKeyRenameMap map
}
return desiredValue, nil
}

func addTagIfRequired(tags, tagKeyRenameMap map[string]string, key, value string) {
if newKey, ok := tagKeyRenameMap[key]; ok && len(newKey) != 0 {
tags[newKey] = value
} else if !ok {
tags[key] = value
}
}