Skip to content

Commit

Permalink
feat: object支持别名配置 (#4851)
Browse files Browse the repository at this point in the history
  • Loading branch information
zzz833708 authored Jan 17, 2025
1 parent 92f0775 commit 4ce8299
Show file tree
Hide file tree
Showing 14 changed files with 158 additions and 6 deletions.
14 changes: 13 additions & 1 deletion bklog/apps/grafana/handlers/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@
from django.conf import settings
from django.utils.translation import ugettext_lazy as _

from apps.api import CCApi
from apps.api import CCApi, TransferApi
from apps.grafana.constants import (
CMDB_EXTEND_FIELDS,
LOG_SEARCH_DIMENSION_LIST,
TIME_SERIES_FIELD_TYPE,
)
from apps.iam import ActionEnum, Permission, ResourceEnum
from apps.log_databus.models import CollectorConfig
from apps.log_desensitize.handlers.desensitize import DesensitizeHandler
from apps.log_desensitize.models import DesensitizeFieldConfig
from apps.log_search.constants import GlobalCategoriesEnum
Expand Down Expand Up @@ -448,11 +449,22 @@ def get_metric_list(self, category_id=None):
"dimension_fields": [],
}

alias_dict = dict()
collector_config = CollectorConfig.objects.filter(index_set_id=index_set.index_set_id).first()
if collector_config:
data = TransferApi.get_result_table({"table_id": collector_config.table_id})
alias_dict = data.get("query_alias_settings", dict())

for field_info in fields.get("fields", []):
field_id = field_description = field_info["field_name"]
if field_info.get("description"):
field_description = field_info["description"]

for alias_name, info in alias_dict.items():
# 有配置别名,修改name
if field_id == info.get("path"):
field_description = alias_name

if field_info["es_doc_values"] and field_info.get("field_type") != "date":
metric_conf["dimension_fields"].append({"id": field_id, "name": field_description})

Expand Down
1 change: 1 addition & 0 deletions bklog/apps/log_databus/handlers/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def create_or_update(self, params: dict):
"etl_params": params["etl_params"],
"etl_fields": params["etl_fields"],
"bk_biz_id": params["bk_biz_id"],
"alias_settings": params["alias_settings"]
}
if params.get("visible_type"):
model_fields["visible_type"] = params["visible_type"]
Expand Down
5 changes: 5 additions & 0 deletions bklog/apps/log_databus/handlers/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,11 @@ def retrieve(self, use_request=True):
for process in self.RETRIEVE_CHAIN:
collector_config = getattr(self, process, lambda x, y: x)(collector_config, context)
logger.info(f"[databus retrieve] process => [{process}] collector_config => [{collector_config}]")
if self.data.table_id:
result_table = TransferApi.get_result_table({"table_id": self.data.table_id})
alias_dict = result_table.get("query_alias_settings", dict())
if alias_dict:
collector_config.update({"alias_settings": alias_dict})

# 添加索引集相关信息
log_index_set_obj = LogIndexSet.objects.filter(collector_config_id=self.collector_config_id).first()
Expand Down
2 changes: 2 additions & 0 deletions bklog/apps/log_databus/handlers/etl/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ def update_or_create(
etl_params=None,
fields=None,
username="",
alias_settings=None,
):
# 停止状态下不能编辑
if self.data and not self.data.is_active:
Expand Down Expand Up @@ -219,6 +220,7 @@ def update_or_create(
etl_params=etl_params,
es_version=cluster_info["cluster_config"]["version"],
hot_warm_config=cluster_info["cluster_config"].get("custom_option", {}).get("hot_warm_config"),
alias_settings=alias_settings,
)

if not view_roles:
Expand Down
2 changes: 2 additions & 0 deletions bklog/apps/log_databus/handlers/etl/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def update_or_create(
sort_fields=None,
target_fields=None,
username="",
alias_settings=None,
*args,
**kwargs,
):
Expand Down Expand Up @@ -123,6 +124,7 @@ def update_or_create(
es_version=cluster_info["cluster_config"]["version"],
hot_warm_config=cluster_info["cluster_config"].get("custom_option", {}).get("hot_warm_config"),
es_shards=es_shards,
alias_settings=alias_settings,
)

if not view_roles:
Expand Down
14 changes: 14 additions & 0 deletions bklog/apps/log_databus/handlers/etl_storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ def update_or_create_result_table(
hot_warm_config: dict = None,
es_shards: int = settings.ES_SHARDS,
index_settings: dict = None,
alias_settings: list = None,
):
"""
创建或更新结果表
Expand All @@ -440,6 +441,7 @@ def update_or_create_result_table(
:param hot_warm_config: 冷热数据配置
:param es_shards: es分片数
:param index_settings: 索引配置
:param alias_settings: 别名配置
"""
from apps.log_databus.handlers.collector import build_result_table_id

Expand Down Expand Up @@ -561,6 +563,18 @@ def update_or_create_result_table(
if "es_type" in field.get("option", {}) and field["option"]["es_type"] in ["text"]:
field["option"]["es_norms"] = False

# 别名配置
if alias_settings:
query_alias_settings = []
for item in alias_settings:
field_alias = {
"field_name": item["field_name"],
"query_alias": item["query_alias"],
"path_type": item["path_type"],
}
query_alias_settings.append(field_alias)
params.update({"query_alias_settings": query_alias_settings})

# 时间默认为维度
if "time_option" in params and "es_doc_values" in params["time_option"]:
del params["time_option"]["es_doc_values"]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 3.2.15 on 2024-12-17 03:37

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('log_databus', '0041_auto_20240830_1524'),
]

operations = [
migrations.AddField(
model_name='cleantemplate',
name='alias_settings',
field=models.JSONField(blank=True, null=True, verbose_name='别名配置'),
),
]
1 change: 1 addition & 0 deletions bklog/apps/log_databus/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ class CleanTemplate(SoftDeleteModel):
bk_biz_id = models.IntegerField(_("业务id"))
visible_type = models.CharField(_("可见类型"), max_length=64, default=VisibleEnum.CURRENT_BIZ.value)
visible_bk_biz_id = MultiStrSplitByCommaFieldText(_("可见业务ID"), default="")
alias_settings = models.JSONField(_("别名配置"), null=True, blank=True)

class Meta:
verbose_name = _("清洗模板")
Expand Down
9 changes: 9 additions & 0 deletions bklog/apps/log_databus/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,12 @@ class AssessmentConfig(serializers.Serializer):
approvals = serializers.ListField(label=_("审批人"), child=serializers.CharField(), required=True)


class AliasSettingSerializer(serializers.Serializer):
field_name = serializers.CharField(label=_("原字段名"), required=True)
query_alias = serializers.CharField(label=_("别名"), required=True)
path_type = serializers.CharField(label=_("字段类型"), required=True)


class CollectorEtlStorageSerializer(CollectorETLParamsFieldSerializer):
table_id = serializers.CharField(label=_("结果表ID"), required=True)
etl_config = serializers.CharField(label=_("清洗类型"), required=True)
Expand All @@ -890,6 +896,7 @@ class CollectorEtlStorageSerializer(CollectorETLParamsFieldSerializer):
view_roles = serializers.ListField(label=_("查看权限"), required=False, default=[])
need_assessment = serializers.BooleanField(label=_("是否需要评估配置"), required=False, default=False)
assessment_config = AssessmentConfig(label=_("评估配置"), required=False)
alias_settings = AliasSettingSerializer(many=True, required=False, default=list)

def validate(self, attrs):
attrs = super().validate(attrs)
Expand Down Expand Up @@ -1003,6 +1010,7 @@ class CleanTemplateSerializer(serializers.Serializer):
bk_biz_id = serializers.IntegerField(label=_("业务id"), required=True)
visible_type = serializers.CharField(label=_("可见类型"), required=False)
visible_bk_biz_id = serializers.ListField(label=_("可见业务ID"), required=False)
alias_settings = serializers.ListField(child=serializers.DictField(), label=_("别名配置"), required=False, default=list)


class CleanTemplateDestroySerializer(serializers.Serializer):
Expand Down Expand Up @@ -1580,6 +1588,7 @@ class FastCollectorUpdateSerializer(CollectorETLParamsFieldSerializer):
allocation_min_days = serializers.IntegerField(label=_("冷热数据生效时间"), required=False)
storage_replies = serializers.IntegerField(label=_("ES副本数量"), required=False, min_value=0)
es_shards = serializers.IntegerField(label=_("ES分片数量"), required=False, min_value=1)
alias_settings = AliasSettingSerializer(many=True, required=False, default=list)

def validate(self, attrs):
attrs = super().validate(attrs)
Expand Down
42 changes: 39 additions & 3 deletions bklog/apps/log_databus/views/clean_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,19 @@ def retrieve(self, request, *args, clean_template_id=None, **kwargs):
],
"bk_biz_id": 0,
"visible_bk_biz_id": [],
"visible_type": "current_biz"
"visible_type": "current_biz",
"alias_settings":[
{
"field_name":"__ext.io_kubernetes_pod",
"query_alias":"k8s_pod",
"path_type":"keyword"
},
{
"field_name":"__ext.io_kubernetes_ip",
"query_alias":"k8s_ip",
"path_type":"keyword"
}
],
},
"result":true
}
Expand Down Expand Up @@ -407,7 +419,19 @@ def update(self, request, *args, clean_template_id=None, **kwargs):
],
"bk_biz_id": 0,
"visible_bk_biz_id": [1, 2, 3],
"visible_type": "multi_biz"
"visible_type": "multi_biz",
"alias_settings":[
{
"field_name":"__ext.io_kubernetes_pod",
"query_alias":"k8s_pod",
"path_type":"keyword"
},
{
"field_name":"__ext.io_kubernetes_ip",
"query_alias":"k8s_ip",
"path_type":"keyword"
}
],
}
@apiSuccessExample {json} 成功返回
{
Expand Down Expand Up @@ -467,7 +491,19 @@ def create(self, request, *args, **kwargs):
],
"bk_biz_id": 0,
"visible_bk_biz_id": [1, 2, 3],
"visible_type": "multi_biz"
"visible_type": "multi_biz",
"alias_settings":[
{
"field_name":"__ext.io_kubernetes_pod",
"query_alias":"k8s_pod",
"path_type":"keyword"
},
{
"field_name":"__ext.io_kubernetes_ip",
"query_alias":"k8s_ip",
"path_type":"keyword"
}
],
}
@apiSuccessExample {json} 成功返回
{
Expand Down
10 changes: 9 additions & 1 deletion bklog/apps/log_databus/views/collector_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1292,6 +1292,10 @@ def update_or_create_clean_config(self, request, collector_config_id=None):
@apiParam {String} assessment_config.log_assessment 单机日志量
@apiParam {Boolean} assessment_config.need_approval 需要审批
@apiParam {List} assessment_config.approvals 审批人
@apiParam {Object[]} alias_settings 别名配置
@apiParam {String} alias_settings.field_name 原字段名
@apiParam {String} alias_settings.query_alias 别名
@apiParam {String} alias_settings.path_type 字段类型
@apiParamExample {json} 请求样例:
{
"table_id": "xxx",
Expand Down Expand Up @@ -2389,6 +2393,10 @@ def fast_update(self, request, collector_config_id):
@apiParam {Int} retention 保留时间
@apiParam {Int} storage_replies 副本数量
@apiParam {Int} es_shards es分片数量
@apiParam {Object[]} alias_settings 别名配置
@apiParam {String} alias_settings.field_name 原字段名
@apiParam {String} alias_settings.query_alias 别名
@apiParam {String} alias_settings.path_type 字段类型
@apiParamExample {json} 请求样例:
{
"collector_config_name": "xxx",
Expand Down Expand Up @@ -2449,7 +2457,7 @@ def collector_batch_operation(self, request):
collector_config_ids = params["collector_config_ids"]
operation_type = params["operation_type"]
return Response(CollectorBatchHandler(collector_config_ids, operation_type).batch_operation(params))

@list_route(methods=["GET"], url_path="search_object_attribute")
def search_object_attribute(self, request):
return Response(CollectorHandler.search_object_attribute())
1 change: 1 addition & 0 deletions bklog/apps/log_search/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -1653,6 +1653,7 @@ class OperatorEnum:

# 字段分析支持列表下载的最大数
MAX_FIELD_VALUE_LIST_NUM = 10000

# SQL模板
SQL_PREFIX = "SELECT DATE_TRUNC(MAX(dtEventTime), 'minute') AS dtEventTime, COUNT(*) AS log_count"
SQL_SUFFIX = "GROUP BY minute1 ORDER BY minute1 DESC LIMIT 10"
Expand Down
21 changes: 21 additions & 0 deletions bklog/apps/log_search/handlers/search/mapping_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from apps.feature_toggle.plugins.constants import DIRECT_ESQUERY_SEARCH
from apps.log_clustering.handlers.dataflow.constants import PATTERN_SEARCH_FIELDS
from apps.log_clustering.models import ClusteringConfig
from apps.log_databus.models import CollectorConfig
from apps.log_search.constants import (
BKDATA_ASYNC_CONTAINER_FIELDS,
BKDATA_ASYNC_FIELDS,
Expand Down Expand Up @@ -808,6 +809,12 @@ def _combine_description_field(self, fields_list=None, scope=None):
if _field_name:
schema_dict.update({_field_name: temp_dict})

alias_dict = dict()
remove_field_list = list()
collector_config = CollectorConfig.objects.filter(index_set_id=self.index_set_id).first()
if collector_config:
data = TransferApi.get_result_table({"table_id": collector_config.table_id})
alias_dict = data.get("query_alias_settings", dict())
# 增加description别名字段
for _field in fields_list:
a_field_name = _field.get("field_name", "")
Expand Down Expand Up @@ -841,6 +848,20 @@ def _combine_description_field(self, fields_list=None, scope=None):
}
)

# 添加别名信息
for alias_name, info in alias_dict.items():
if a_field_name == info.get("path"):
_field["query_alias"] = alias_name

# 别名字段
if _field.get("field_type") == "alias":
remove_field_list.append(_field)

# 移除不展示的别名字段
for field in remove_field_list:
if field in fields_list:
fields_list.remove(field)

return fields_list

def get_bkdata_schema(self, index: str) -> list:
Expand Down
24 changes: 23 additions & 1 deletion bklog/apps/log_search/handlers/search/search_handlers_esquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from django.core.cache import cache
from django.utils.translation import ugettext as _

from apps.api import BcsApi, BkLogApi, MonitorApi
from apps.api import BcsApi, BkLogApi, MonitorApi, TransferApi
from apps.api.base import DataApiRetryClass
from apps.exceptions import ApiRequestError, ApiResultError
from apps.feature_toggle.handlers.toggle import FeatureToggleObject
Expand Down Expand Up @@ -610,6 +610,28 @@ def search(self, search_type="default", is_export=False):
if _scroll_id:
result.update({"scroll_id": _scroll_id})

# 补充别名信息
log_list = result.get("list")
collector_config = CollectorConfig.objects.filter(index_set_id=self.index_set_id).first()
if collector_config:
data = TransferApi.get_result_table({"table_id": collector_config.table_id})
alias_dict = data.get("query_alias_settings")
if alias_dict:
for log in log_list:
for query_alias, info in alias_dict.items():
sub_field = info.get("path")
if "." not in sub_field:
if sub_field in log:
log[query_alias] = log[sub_field]
else:
context = log
# 处理嵌套字段
while "." in sub_field:
prefix, sub_field = sub_field.split(".", 1)
context = context.get(prefix, {})
if sub_field in context:
log[query_alias] = context[sub_field]
break
return result

def get_sort_group(self):
Expand Down

0 comments on commit 4ce8299

Please sign in to comment.