diff --git a/bklog/apps/grafana/handlers/query.py b/bklog/apps/grafana/handlers/query.py index a5ab3ebbe4..23465e8795 100644 --- a/bklog/apps/grafana/handlers/query.py +++ b/bklog/apps/grafana/handlers/query.py @@ -29,13 +29,14 @@ from django.conf import settings from django.utils.translation import ugettext_lazy as _ -from apps.api import CCApi +from apps.api import CCApi, TransferApi from apps.grafana.constants import ( CMDB_EXTEND_FIELDS, LOG_SEARCH_DIMENSION_LIST, TIME_SERIES_FIELD_TYPE, ) from apps.iam import ActionEnum, Permission, ResourceEnum +from apps.log_databus.models import CollectorConfig from apps.log_desensitize.handlers.desensitize import DesensitizeHandler from apps.log_desensitize.models import DesensitizeFieldConfig from apps.log_search.constants import GlobalCategoriesEnum @@ -448,11 +449,22 @@ def get_metric_list(self, category_id=None): "dimension_fields": [], } + alias_dict = dict() + collector_config = CollectorConfig.objects.filter(index_set_id=index_set.index_set_id).first() + if collector_config: + data = TransferApi.get_result_table({"table_id": collector_config.table_id}) + alias_dict = data.get("query_alias_settings", dict()) + for field_info in fields.get("fields", []): field_id = field_description = field_info["field_name"] if field_info.get("description"): field_description = field_info["description"] + for alias_name, info in alias_dict.items(): + # 有配置别名,修改name + if field_id == info.get("path"): + field_description = alias_name + if field_info["es_doc_values"] and field_info.get("field_type") != "date": metric_conf["dimension_fields"].append({"id": field_id, "name": field_description}) diff --git a/bklog/apps/log_databus/handlers/clean.py b/bklog/apps/log_databus/handlers/clean.py index 75bedc50de..16b0b0d0e8 100644 --- a/bklog/apps/log_databus/handlers/clean.py +++ b/bklog/apps/log_databus/handlers/clean.py @@ -96,6 +96,7 @@ def create_or_update(self, params: dict): "etl_params": params["etl_params"], "etl_fields": params["etl_fields"], "bk_biz_id": params["bk_biz_id"], + "alias_settings": params["alias_settings"] } if params.get("visible_type"): model_fields["visible_type"] = params["visible_type"] diff --git a/bklog/apps/log_databus/handlers/collector.py b/bklog/apps/log_databus/handlers/collector.py index 5a07892225..5a86518b26 100644 --- a/bklog/apps/log_databus/handlers/collector.py +++ b/bklog/apps/log_databus/handlers/collector.py @@ -447,6 +447,11 @@ def retrieve(self, use_request=True): for process in self.RETRIEVE_CHAIN: collector_config = getattr(self, process, lambda x, y: x)(collector_config, context) logger.info(f"[databus retrieve] process => [{process}] collector_config => [{collector_config}]") + if self.data.table_id: + result_table = TransferApi.get_result_table({"table_id": self.data.table_id}) + alias_dict = result_table.get("query_alias_settings", dict()) + if alias_dict: + collector_config.update({"alias_settings": alias_dict}) # 添加索引集相关信息 log_index_set_obj = LogIndexSet.objects.filter(collector_config_id=self.collector_config_id).first() diff --git a/bklog/apps/log_databus/handlers/etl/base.py b/bklog/apps/log_databus/handlers/etl/base.py index 3f3ba5712b..80c6836ebc 100644 --- a/bklog/apps/log_databus/handlers/etl/base.py +++ b/bklog/apps/log_databus/handlers/etl/base.py @@ -162,6 +162,7 @@ def update_or_create( etl_params=None, fields=None, username="", + alias_settings=None, ): # 停止状态下不能编辑 if self.data and not self.data.is_active: @@ -219,6 +220,7 @@ def update_or_create( etl_params=etl_params, es_version=cluster_info["cluster_config"]["version"], hot_warm_config=cluster_info["cluster_config"].get("custom_option", {}).get("hot_warm_config"), + alias_settings=alias_settings, ) if not view_roles: diff --git a/bklog/apps/log_databus/handlers/etl/transfer.py b/bklog/apps/log_databus/handlers/etl/transfer.py index 88cb75ffdd..f9ded3f828 100644 --- a/bklog/apps/log_databus/handlers/etl/transfer.py +++ b/bklog/apps/log_databus/handlers/etl/transfer.py @@ -57,6 +57,7 @@ def update_or_create( sort_fields=None, target_fields=None, username="", + alias_settings=None, *args, **kwargs, ): @@ -123,6 +124,7 @@ def update_or_create( es_version=cluster_info["cluster_config"]["version"], hot_warm_config=cluster_info["cluster_config"].get("custom_option", {}).get("hot_warm_config"), es_shards=es_shards, + alias_settings=alias_settings, ) if not view_roles: diff --git a/bklog/apps/log_databus/handlers/etl_storage/base.py b/bklog/apps/log_databus/handlers/etl_storage/base.py index 1671a5abda..a962eb86a5 100644 --- a/bklog/apps/log_databus/handlers/etl_storage/base.py +++ b/bklog/apps/log_databus/handlers/etl_storage/base.py @@ -425,6 +425,7 @@ def update_or_create_result_table( hot_warm_config: dict = None, es_shards: int = settings.ES_SHARDS, index_settings: dict = None, + alias_settings: list = None, ): """ 创建或更新结果表 @@ -440,6 +441,7 @@ def update_or_create_result_table( :param hot_warm_config: 冷热数据配置 :param es_shards: es分片数 :param index_settings: 索引配置 + :param alias_settings: 别名配置 """ from apps.log_databus.handlers.collector import build_result_table_id @@ -561,6 +563,18 @@ def update_or_create_result_table( if "es_type" in field.get("option", {}) and field["option"]["es_type"] in ["text"]: field["option"]["es_norms"] = False + # 别名配置 + if alias_settings: + query_alias_settings = [] + for item in alias_settings: + field_alias = { + "field_name": item["field_name"], + "query_alias": item["query_alias"], + "path_type": item["path_type"], + } + query_alias_settings.append(field_alias) + params.update({"query_alias_settings": query_alias_settings}) + # 时间默认为维度 if "time_option" in params and "es_doc_values" in params["time_option"]: del params["time_option"]["es_doc_values"] diff --git a/bklog/apps/log_databus/migrations/0042_cleantemplate_alias_settings.py b/bklog/apps/log_databus/migrations/0042_cleantemplate_alias_settings.py new file mode 100644 index 0000000000..dfdf796a62 --- /dev/null +++ b/bklog/apps/log_databus/migrations/0042_cleantemplate_alias_settings.py @@ -0,0 +1,18 @@ +# Generated by Django 3.2.15 on 2024-12-17 03:37 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('log_databus', '0041_auto_20240830_1524'), + ] + + operations = [ + migrations.AddField( + model_name='cleantemplate', + name='alias_settings', + field=models.JSONField(blank=True, null=True, verbose_name='别名配置'), + ), + ] diff --git a/bklog/apps/log_databus/models.py b/bklog/apps/log_databus/models.py index e41cde80f8..dbcb718ab2 100644 --- a/bklog/apps/log_databus/models.py +++ b/bklog/apps/log_databus/models.py @@ -501,6 +501,7 @@ class CleanTemplate(SoftDeleteModel): bk_biz_id = models.IntegerField(_("业务id")) visible_type = models.CharField(_("可见类型"), max_length=64, default=VisibleEnum.CURRENT_BIZ.value) visible_bk_biz_id = MultiStrSplitByCommaFieldText(_("可见业务ID"), default="") + alias_settings = models.JSONField(_("别名配置"), null=True, blank=True) class Meta: verbose_name = _("清洗模板") diff --git a/bklog/apps/log_databus/serializers.py b/bklog/apps/log_databus/serializers.py index 14af2f9cd9..a81797db11 100644 --- a/bklog/apps/log_databus/serializers.py +++ b/bklog/apps/log_databus/serializers.py @@ -877,6 +877,12 @@ class AssessmentConfig(serializers.Serializer): approvals = serializers.ListField(label=_("审批人"), child=serializers.CharField(), required=True) +class AliasSettingSerializer(serializers.Serializer): + field_name = serializers.CharField(label=_("原字段名"), required=True) + query_alias = serializers.CharField(label=_("别名"), required=True) + path_type = serializers.CharField(label=_("字段类型"), required=True) + + class CollectorEtlStorageSerializer(CollectorETLParamsFieldSerializer): table_id = serializers.CharField(label=_("结果表ID"), required=True) etl_config = serializers.CharField(label=_("清洗类型"), required=True) @@ -890,6 +896,7 @@ class CollectorEtlStorageSerializer(CollectorETLParamsFieldSerializer): view_roles = serializers.ListField(label=_("查看权限"), required=False, default=[]) need_assessment = serializers.BooleanField(label=_("是否需要评估配置"), required=False, default=False) assessment_config = AssessmentConfig(label=_("评估配置"), required=False) + alias_settings = AliasSettingSerializer(many=True, required=False, default=list) def validate(self, attrs): attrs = super().validate(attrs) @@ -1003,6 +1010,7 @@ class CleanTemplateSerializer(serializers.Serializer): bk_biz_id = serializers.IntegerField(label=_("业务id"), required=True) visible_type = serializers.CharField(label=_("可见类型"), required=False) visible_bk_biz_id = serializers.ListField(label=_("可见业务ID"), required=False) + alias_settings = serializers.ListField(child=serializers.DictField(), label=_("别名配置"), required=False, default=list) class CleanTemplateDestroySerializer(serializers.Serializer): @@ -1580,6 +1588,7 @@ class FastCollectorUpdateSerializer(CollectorETLParamsFieldSerializer): allocation_min_days = serializers.IntegerField(label=_("冷热数据生效时间"), required=False) storage_replies = serializers.IntegerField(label=_("ES副本数量"), required=False, min_value=0) es_shards = serializers.IntegerField(label=_("ES分片数量"), required=False, min_value=1) + alias_settings = AliasSettingSerializer(many=True, required=False, default=list) def validate(self, attrs): attrs = super().validate(attrs) diff --git a/bklog/apps/log_databus/views/clean_views.py b/bklog/apps/log_databus/views/clean_views.py index 228f7d8b16..5d60721edf 100644 --- a/bklog/apps/log_databus/views/clean_views.py +++ b/bklog/apps/log_databus/views/clean_views.py @@ -355,7 +355,19 @@ def retrieve(self, request, *args, clean_template_id=None, **kwargs): ], "bk_biz_id": 0, "visible_bk_biz_id": [], - "visible_type": "current_biz" + "visible_type": "current_biz", + "alias_settings":[ + { + "field_name":"__ext.io_kubernetes_pod", + "query_alias":"k8s_pod", + "path_type":"keyword" + }, + { + "field_name":"__ext.io_kubernetes_ip", + "query_alias":"k8s_ip", + "path_type":"keyword" + } + ], }, "result":true } @@ -407,7 +419,19 @@ def update(self, request, *args, clean_template_id=None, **kwargs): ], "bk_biz_id": 0, "visible_bk_biz_id": [1, 2, 3], - "visible_type": "multi_biz" + "visible_type": "multi_biz", + "alias_settings":[ + { + "field_name":"__ext.io_kubernetes_pod", + "query_alias":"k8s_pod", + "path_type":"keyword" + }, + { + "field_name":"__ext.io_kubernetes_ip", + "query_alias":"k8s_ip", + "path_type":"keyword" + } + ], } @apiSuccessExample {json} 成功返回 { @@ -467,7 +491,19 @@ def create(self, request, *args, **kwargs): ], "bk_biz_id": 0, "visible_bk_biz_id": [1, 2, 3], - "visible_type": "multi_biz" + "visible_type": "multi_biz", + "alias_settings":[ + { + "field_name":"__ext.io_kubernetes_pod", + "query_alias":"k8s_pod", + "path_type":"keyword" + }, + { + "field_name":"__ext.io_kubernetes_ip", + "query_alias":"k8s_ip", + "path_type":"keyword" + } + ], } @apiSuccessExample {json} 成功返回 { diff --git a/bklog/apps/log_databus/views/collector_views.py b/bklog/apps/log_databus/views/collector_views.py index e638a05687..e54bc5d23e 100644 --- a/bklog/apps/log_databus/views/collector_views.py +++ b/bklog/apps/log_databus/views/collector_views.py @@ -1292,6 +1292,10 @@ def update_or_create_clean_config(self, request, collector_config_id=None): @apiParam {String} assessment_config.log_assessment 单机日志量 @apiParam {Boolean} assessment_config.need_approval 需要审批 @apiParam {List} assessment_config.approvals 审批人 + @apiParam {Object[]} alias_settings 别名配置 + @apiParam {String} alias_settings.field_name 原字段名 + @apiParam {String} alias_settings.query_alias 别名 + @apiParam {String} alias_settings.path_type 字段类型 @apiParamExample {json} 请求样例: { "table_id": "xxx", @@ -2389,6 +2393,10 @@ def fast_update(self, request, collector_config_id): @apiParam {Int} retention 保留时间 @apiParam {Int} storage_replies 副本数量 @apiParam {Int} es_shards es分片数量 + @apiParam {Object[]} alias_settings 别名配置 + @apiParam {String} alias_settings.field_name 原字段名 + @apiParam {String} alias_settings.query_alias 别名 + @apiParam {String} alias_settings.path_type 字段类型 @apiParamExample {json} 请求样例: { "collector_config_name": "xxx", @@ -2449,7 +2457,7 @@ def collector_batch_operation(self, request): collector_config_ids = params["collector_config_ids"] operation_type = params["operation_type"] return Response(CollectorBatchHandler(collector_config_ids, operation_type).batch_operation(params)) - + @list_route(methods=["GET"], url_path="search_object_attribute") def search_object_attribute(self, request): return Response(CollectorHandler.search_object_attribute()) diff --git a/bklog/apps/log_search/constants.py b/bklog/apps/log_search/constants.py index 436a53aadf..132c6d2ddb 100644 --- a/bklog/apps/log_search/constants.py +++ b/bklog/apps/log_search/constants.py @@ -1653,6 +1653,7 @@ class OperatorEnum: # 字段分析支持列表下载的最大数 MAX_FIELD_VALUE_LIST_NUM = 10000 + # SQL模板 SQL_PREFIX = "SELECT DATE_TRUNC(MAX(dtEventTime), 'minute') AS dtEventTime, COUNT(*) AS log_count" SQL_SUFFIX = "GROUP BY minute1 ORDER BY minute1 DESC LIMIT 10" diff --git a/bklog/apps/log_search/handlers/search/mapping_handlers.py b/bklog/apps/log_search/handlers/search/mapping_handlers.py index 24d6ceff88..0bed4296d3 100644 --- a/bklog/apps/log_search/handlers/search/mapping_handlers.py +++ b/bklog/apps/log_search/handlers/search/mapping_handlers.py @@ -37,6 +37,7 @@ from apps.feature_toggle.plugins.constants import DIRECT_ESQUERY_SEARCH from apps.log_clustering.handlers.dataflow.constants import PATTERN_SEARCH_FIELDS from apps.log_clustering.models import ClusteringConfig +from apps.log_databus.models import CollectorConfig from apps.log_search.constants import ( BKDATA_ASYNC_CONTAINER_FIELDS, BKDATA_ASYNC_FIELDS, @@ -808,6 +809,12 @@ def _combine_description_field(self, fields_list=None, scope=None): if _field_name: schema_dict.update({_field_name: temp_dict}) + alias_dict = dict() + remove_field_list = list() + collector_config = CollectorConfig.objects.filter(index_set_id=self.index_set_id).first() + if collector_config: + data = TransferApi.get_result_table({"table_id": collector_config.table_id}) + alias_dict = data.get("query_alias_settings", dict()) # 增加description别名字段 for _field in fields_list: a_field_name = _field.get("field_name", "") @@ -841,6 +848,20 @@ def _combine_description_field(self, fields_list=None, scope=None): } ) + # 添加别名信息 + for alias_name, info in alias_dict.items(): + if a_field_name == info.get("path"): + _field["query_alias"] = alias_name + + # 别名字段 + if _field.get("field_type") == "alias": + remove_field_list.append(_field) + + # 移除不展示的别名字段 + for field in remove_field_list: + if field in fields_list: + fields_list.remove(field) + return fields_list def get_bkdata_schema(self, index: str) -> list: diff --git a/bklog/apps/log_search/handlers/search/search_handlers_esquery.py b/bklog/apps/log_search/handlers/search/search_handlers_esquery.py index 2a4585b1c7..25e2d10ed8 100644 --- a/bklog/apps/log_search/handlers/search/search_handlers_esquery.py +++ b/bklog/apps/log_search/handlers/search/search_handlers_esquery.py @@ -34,7 +34,7 @@ from django.core.cache import cache from django.utils.translation import ugettext as _ -from apps.api import BcsApi, BkLogApi, MonitorApi +from apps.api import BcsApi, BkLogApi, MonitorApi, TransferApi from apps.api.base import DataApiRetryClass from apps.exceptions import ApiRequestError, ApiResultError from apps.feature_toggle.handlers.toggle import FeatureToggleObject @@ -610,6 +610,28 @@ def search(self, search_type="default", is_export=False): if _scroll_id: result.update({"scroll_id": _scroll_id}) + # 补充别名信息 + log_list = result.get("list") + collector_config = CollectorConfig.objects.filter(index_set_id=self.index_set_id).first() + if collector_config: + data = TransferApi.get_result_table({"table_id": collector_config.table_id}) + alias_dict = data.get("query_alias_settings") + if alias_dict: + for log in log_list: + for query_alias, info in alias_dict.items(): + sub_field = info.get("path") + if "." not in sub_field: + if sub_field in log: + log[query_alias] = log[sub_field] + else: + context = log + # 处理嵌套字段 + while "." in sub_field: + prefix, sub_field = sub_field.split(".", 1) + context = context.get(prefix, {}) + if sub_field in context: + log[query_alias] = context[sub_field] + break return result def get_sort_group(self):