diff --git a/apps/backend/components/collections/base.py b/apps/backend/components/collections/base.py index 867b5cd1f..b93e6c47f 100644 --- a/apps/backend/components/collections/base.py +++ b/apps/backend/components/collections/base.py @@ -39,7 +39,7 @@ from apps.node_man import constants, models from apps.prometheus import metrics from apps.prometheus.helper import SetupObserve -from apps.utils import cache, local, time_handler, translation +from apps.utils import cache, time_handler, translation from apps.utils.exc import ExceptionHandler from pipeline.core.flow import Service @@ -94,9 +94,7 @@ def service_run_exc_handler( sub_inst_ids = instance.get_subscription_instance_ids(data) code = instance.__class__.__name__ - metrics.app_task_engine_service_run_exceptions_total.labels( - hostname=local.get_hostname(), code=code, **parse_exception(exc) - ).inc() + metrics.app_task_engine_service_run_exceptions_total.labels(code=code, **parse_exception(exc)).inc() logger.exception(f"[task_engine][service_run_exc_handler:{code}] act_name -> {act_name}, exc -> {str(exc)}") @@ -128,7 +126,7 @@ def get_language_func( def get_labels_func( wrapped: Callable, instance: "BaseService", args: Tuple[Any], kwargs: Dict[str, Any] ) -> typing.Dict[str, str]: - return {"code": instance.__class__.__name__, "hostname": local.get_hostname()} + return {"code": instance.__class__.__name__} class LogMixin: @@ -274,9 +272,7 @@ def bulk_set_sub_inst_status(self, status: str, sub_inst_ids: Union[List[int], S models.SubscriptionInstanceRecord.objects.filter(id__in=sub_inst_ids).update( status=status, update_time=timezone.now() ) - metrics.app_task_engine_sub_inst_statuses_total.labels(hostname=local.get_hostname(), status=status).inc( - len(sub_inst_ids) - ) + metrics.app_task_engine_sub_inst_statuses_total.labels(status=status).inc(len(sub_inst_ids)) if status in [constants.JobStatusType.FAILED]: self.sub_inst_failed_handler(sub_inst_ids) diff --git a/apps/backend/subscription/tasks.py b/apps/backend/subscription/tasks.py index cb073e97a..fbd33ca2c 100644 --- a/apps/backend/subscription/tasks.py +++ b/apps/backend/subscription/tasks.py @@ -32,7 +32,7 @@ from apps.node_man import tools as node_man_tools from apps.node_man.handlers.cmdb import CmdbHandler from apps.prometheus import metrics -from apps.utils import local, translation +from apps.utils import translation from pipeline import builder from pipeline.builder import Data, NodeOutput, ServiceActivity, Var from pipeline.core.pipeline import Pipeline @@ -319,7 +319,6 @@ def create_task( if instance_info["host"].get("bk_host_id") } - hostname = local.get_hostname() # 前置错误需要跳过的主机,不创建订阅任务实例 error_hosts = [] # 批量创建订阅实例执行记录 @@ -462,9 +461,9 @@ def create_task( ) ) - metrics.app_task_engine_sub_inst_statuses_total.labels( - hostname=hostname, status=constants.JobStatusType.PENDING - ).inc(amount=len(created_instance_records)) + metrics.app_task_engine_sub_inst_statuses_total.labels(status=constants.JobStatusType.PENDING).inc( + amount=len(created_instance_records) + ) task_host_limit = models.GlobalSettings.get_config( models.GlobalSettings.KeyEnum.TASK_HOST_LIMIT.value, default=TASK_HOST_LIMIT diff --git a/apps/backend/subscription/tools.py b/apps/backend/subscription/tools.py index e8d0264e6..ec321eea4 100644 --- a/apps/backend/subscription/tools.py +++ b/apps/backend/subscription/tools.py @@ -351,26 +351,26 @@ def get_service_instance_by_ids(bk_biz_id, ids): return result -@FuncCacheDecorator(cache_time=5 * constants.TimeUnit.MINUTE) -def fetch_biz_info_map(fields: typing.Optional[typing.List[str]] = None) -> typing.Dict[int, typing.Dict]: +@FuncCacheDecorator(cache_time=1 * constants.TimeUnit.MINUTE) +def fetch_biz_info_map(fields: typing.Optional[typing.List[str]] = None) -> typing.Dict[str, typing.Dict]: """ 查询所有业务 :return: 主机业务关系列表 """ fields = fields or ["bk_biz_id", "bk_biz_name"] biz_infos: typing.List[typing.Dict] = batch_request(client_v2.cc.search_business, {"fields": fields}) - biz_infos.append({"bk_biz_id": settings.BK_CMDB_RESOURCE_POOL_BIZ_ID, "bk_biz_name": "资源池"}) + biz_infos.append({"bk_biz_id": str(settings.BK_CMDB_RESOURCE_POOL_BIZ_ID), "bk_biz_name": "资源池"}) - biz_info_map: typing.Dict[int, typing.Dict] = {biz_info["bk_biz_id"]: biz_info for biz_info in biz_infos} + biz_info_map: typing.Dict[str, typing.Dict] = {str(biz_info["bk_biz_id"]): biz_info for biz_info in biz_infos} logger.info("[fetch_biz_info_map] fields -> %s, count -> %s", pprint.pformat(fields), len(biz_infos)) return biz_info_map def fetch_biz_info(bk_biz_ids: typing.List[int]) -> typing.Dict[int, typing.Dict]: - biz_info_map: typing.Dict[int, typing.Dict] = fetch_biz_info_map(get_cache=True) + biz_info_map: typing.Dict[str, typing.Dict] = fetch_biz_info_map(get_cache=True) if not biz_info_map: logger.error("[fetch_biz_info] biz_info_map is empty !") - return {bk_biz_id: biz_info_map.get(bk_biz_id) or {} for bk_biz_id in bk_biz_ids} + return {bk_biz_id: biz_info_map.get(str(bk_biz_id)) or {} for bk_biz_id in bk_biz_ids} def get_host_detail_by_template(bk_obj_id, template_info_list: list, bk_biz_id: int = None): @@ -401,7 +401,7 @@ def get_host_detail_by_template(bk_obj_id, template_info_list: list, bk_biz_id: call_func, dict(bk_set_template_ids=template_ids, bk_biz_id=bk_biz_id, fields=fields) ) biz_info = fetch_biz_info([bk_biz_id]) - cloud_id_name_map = models.Cloud.cloud_id_name_map() + cloud_id_name_map = models.Cloud.cloud_id_name_map(get_cache=True) if not biz_info[bk_biz_id]: logger.warning("[get_host_detail_by_template] can not find biz_info -> %s", bk_biz_id) @@ -409,7 +409,7 @@ def get_host_detail_by_template(bk_obj_id, template_info_list: list, bk_biz_id: for host in host_info_result: host["bk_biz_id"] = bk_biz_id host["bk_biz_name"] = host["bk_biz_name"] = biz_info[bk_biz_id].get("bk_biz_name") - host["bk_cloud_name"] = cloud_id_name_map.get(host["bk_cloud_id"]) + host["bk_cloud_name"] = cloud_id_name_map.get(str(host["bk_cloud_id"])) return host_info_result @@ -564,7 +564,7 @@ def get_host_detail(host_info_list: list, bk_biz_id: int = None): logger.warning("[get_host_detail] can not find biz_info -> %s", _host["bk_biz_id"]) _host["bk_cloud_name"] = ( - cloud_id_name_map.get(_host["bk_cloud_id"], "") + cloud_id_name_map.get(str(_host["bk_cloud_id"]), "") if _host["bk_cloud_id"] != constants.DEFAULT_CLOUD else "直连区域" ) diff --git a/apps/iam/handlers/resources.py b/apps/iam/handlers/resources.py index 3c9ec3dcd..67a20e8cf 100644 --- a/apps/iam/handlers/resources.py +++ b/apps/iam/handlers/resources.py @@ -119,9 +119,9 @@ def create_instance(cls, instance_id: str, attribute=None) -> Resource: @classmethod def create_instances(cls, instance_ids: Union[List[str], Set[str]], attribute=None) -> List[Resource]: - cloud_id_name_map = models.Cloud.cloud_id_name_map() + cloud_id_name_map = models.Cloud.cloud_id_name_map(get_cache=True) return [ - cls.create_instance(instance_id, {"name": cloud_id_name_map.get(int(instance_id))}) + cls.create_instance(instance_id, {"name": cloud_id_name_map.get(str(instance_id))}) for instance_id in instance_ids ] diff --git a/apps/node_man/handlers/job.py b/apps/node_man/handlers/job.py index dfaf347a6..e45ddc525 100644 --- a/apps/node_man/handlers/job.py +++ b/apps/node_man/handlers/job.py @@ -846,12 +846,12 @@ def retrieve(self, params: Dict[str, Any]): host_execute_status_list.extend(filter_hosts) # 补充业务名、管控区域名称 - cloud_id_name_map = models.Cloud.cloud_id_name_map() + cloud_id_name_map = models.Cloud.cloud_id_name_map(get_cache=True) biz_name_map = CmdbHandler.biz_id_name_without_permission() for host_execute_status in host_execute_status_list: host_execute_status.update( bk_biz_name=biz_name_map.get(host_execute_status.get("bk_biz_id")), - bk_cloud_name=cloud_id_name_map.get(host_execute_status["bk_cloud_id"]), + bk_cloud_name=cloud_id_name_map.get(str(host_execute_status["bk_cloud_id"])), ) tools.JobTools.update_job_statistics(self.data, task_result["status_counter"]) diff --git a/apps/node_man/handlers/policy.py b/apps/node_man/handlers/policy.py index e0f7ae5e9..8ad07843d 100644 --- a/apps/node_man/handlers/policy.py +++ b/apps/node_man/handlers/policy.py @@ -509,7 +509,7 @@ def migrate_preview(cls, query_params: Dict[str, Any]) -> List[Dict[str, Any]]: ) # 补充业务名、管控区域名称 - cloud_id_name_map = models.Cloud.cloud_id_name_map() + cloud_id_name_map = models.Cloud.cloud_id_name_map(get_cache=True) biz_name_map = CmdbHandler.biz_id_name_without_permission() results = [] @@ -517,7 +517,7 @@ def migrate_preview(cls, query_params: Dict[str, Any]) -> List[Dict[str, Any]]: for instance in instances: instance.update( bk_biz_name=biz_name_map.get(instance.get("bk_biz_id")), - bk_cloud_name=cloud_id_name_map.get(instance["bk_cloud_id"]), + bk_cloud_name=cloud_id_name_map.get(str(instance["bk_cloud_id"])), ) inst_job_type = constants.ACTION_NAME_JOB_TYPE_MAP.get(action_id) diff --git a/apps/node_man/models.py b/apps/node_man/models.py index 7f0f42485..6841481b6 100644 --- a/apps/node_man/models.py +++ b/apps/node_man/models.py @@ -719,11 +719,12 @@ class Cloud(models.Model): @classmethod @FuncCacheDecorator(cache_time=20 * constants.TimeUnit.SECOND) - def cloud_id_name_map(cls) -> Dict: + def cloud_id_name_map(cls) -> Dict[str, str]: all_cloud_map = { - cloud.bk_cloud_id: cloud.bk_cloud_name for cloud in cls.objects.all().only("bk_cloud_id", "bk_cloud_name") + str(cloud.bk_cloud_id): cloud.bk_cloud_name + for cloud in cls.objects.all().only("bk_cloud_id", "bk_cloud_name") } - all_cloud_map[constants.DEFAULT_CLOUD] = str(_("直连区域")) + all_cloud_map[str(constants.DEFAULT_CLOUD)] = str(_("直连区域")) return all_cloud_map @classmethod diff --git a/apps/prometheus/metrics.py b/apps/prometheus/metrics.py index e262d39a7..027809891 100644 --- a/apps/prometheus/metrics.py +++ b/apps/prometheus/metrics.py @@ -103,41 +103,40 @@ def get_histogram_buckets_from_env(env_name): app_task_engine_running_executes_info = Gauge( name="app_task_engine_running_executes", - documentation="Number of engine running executes per hostname, per code.", - labelnames=["hostname", "code"], + documentation="Number of engine running executes per code.", + labelnames=["code"], ) app_task_engine_running_schedules_info = Gauge( name="app_task_engine_running_schedules", - documentation="Number of engine running schedules per hostname, per code.", - labelnames=["hostname", "code"], + documentation="Number of engine running schedules per code.", + labelnames=["code"], ) app_task_engine_execute_duration_seconds = Histogram( name="app_task_engine_execute_duration_seconds", - documentation="Histogram of the time (in seconds) each engine execute per hostname, per code.", + documentation="Histogram of the time (in seconds) each engine execute per code.", buckets=get_histogram_buckets_from_env("BKAPP_MONITOR_METRICS_ENGINE_BUCKETS"), - labelnames=["hostname", "code"], + labelnames=["code"], ) app_task_engine_schedule_duration_seconds = Histogram( name="app_task_engine_schedule_duration_seconds", - documentation="Histogram of the time (in seconds) each engine schedule per hostname, per code.", + documentation="Histogram of the time (in seconds) each engine schedule per code.", buckets=get_histogram_buckets_from_env("BKAPP_MONITOR_METRICS_ENGINE_BUCKETS"), - labelnames=["hostname", "code"], + labelnames=["code"], ) app_task_engine_service_run_exceptions_total = Counter( name="app_task_engine_service_run_exceptions_total", - documentation="Cumulative count of engine service run exceptions " - "per hostname, per code, per exc_type, per exc_code.", - labelnames=["hostname", "code", "exc_type", "exc_code"], + documentation="Cumulative count of engine service run exceptions " "per code, per exc_type, per exc_code.", + labelnames=["code", "exc_type", "exc_code"], ) app_task_engine_sub_inst_statuses_total = Counter( name="app_task_engine_sub_inst_statuses_total", - documentation="Cumulative count of engine subscription instance statuses per hostname, per status .", - labelnames=["hostname", "status"], + documentation="Cumulative count of engine subscription instance statuses per status .", + labelnames=["status"], ) app_task_engine_get_common_data_duration_seconds = Histogram( @@ -197,8 +196,8 @@ def get_histogram_buckets_from_env(env_name): app_core_remote_proxy_info = Gauge( name="app_core_remote_proxy_info", - documentation="A metric with a constants '1' value labeled by hostname, ip, bk_cloud_id, paramiko_version", - labelnames=["hostname", "ip", "bk_cloud_id", "paramiko_version"], + documentation="A metric with a constants '1' value labeled by proxy_name, proxy_ip, bk_cloud_id, paramiko_version", + labelnames=["proxy_name", "proxy_ip", "bk_cloud_id", "paramiko_version"], ) app_core_cache_decorator_requests_total = Counter( diff --git a/apps/prometheus/reporter.py b/apps/prometheus/reporter.py new file mode 100644 index 000000000..662f02b48 --- /dev/null +++ b/apps/prometheus/reporter.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import logging +import os +import time +from typing import Dict, Optional + +from bk_monitor_report import MonitorReporter as Reporter +from celery.utils.nodenames import gethostname, host_format +from prometheus_client import REGISTRY, CollectorRegistry, generate_latest +from prometheus_client.parser import text_string_to_metric_families + +logger = logging.getLogger("bk-monitor-report") + + +class MonitorReporter(Reporter): + def __init__( + self, + data_id: int, + access_token: str, + target: str, + url: str, + report_interval: int = 60, + chunk_size: int = 500, + registry: Optional[CollectorRegistry] = REGISTRY, + proc_type: str = "", + instance_tmpl: str = "", + ): + super().__init__(data_id, access_token, target, url, report_interval, chunk_size, registry) + self.proc_type = proc_type + self.instance_tmpl = instance_tmpl + + def generate_addition_labels(self) -> Dict[str, str]: + addition_labels: Dict[str, str] = {"hostname": gethostname()} + if self.proc_type == "celery": + addition_labels["instance"] = host_format(self.instance_tmpl) + else: + addition_labels["instance"] = host_format(self.instance_tmpl, P=str(os.getpid())) + return addition_labels + + def generate_chunked_report_data(self): + timestamp = round(time.time() * 1000) + + addition_labels = self.generate_addition_labels() + data = {"data_id": self.data_id, "access_token": self.access_token, "data": []} + size = 0 + metrics_text = generate_latest(self.registry).decode("utf-8") + for family in text_string_to_metric_families(metrics_text): + for sample in family.samples: + labels = sample.labels or {} + # 补充维度 + labels.update(addition_labels) + data["data"].append( + { + "metrics": {sample.name: sample.value}, + "target": self.target, + "dimension": labels, + "timestamp": timestamp, + } + ) + size += 1 + if size % self.chunk_size == 0: + yield data + data = {"data_id": self.data_id, "access_token": self.access_token, "data": []} + + if data["data"]: + yield data diff --git a/config/patchers/monitor_reporter.py b/config/patchers/monitor_reporter.py index 19e96986c..93f76961b 100644 --- a/config/patchers/monitor_reporter.py +++ b/config/patchers/monitor_reporter.py @@ -39,10 +39,22 @@ def monitor_report_config(): sys.stdout.write("[!]can't found er queue in command: %s, skip celery monitor report config\n" % boot_cmd) return - from bk_monitor_report import MonitorReporter # noqa + proc_type = "celery" + try: + n_conf_index = sys.argv.index("-n") + except ValueError as e: + # 没有 get 到,说明是单 workers 场景 + instance_tmpl = "celery@%h-%i" + sys.stdout.write("[!]can't found -n option in command: %s, use default -> celery@xxx: %s\n" % (boot_cmd, e)) + else: + # %i 区分单 workers 多进程的情况 + # -n 区分单主机多 workers 的情况 + instance_tmpl = sys.argv[n_conf_index + 1] + "-%i" + from bk_monitor_report.contrib.celery import MonitorReportStep # noqa from apps.backend.celery import app as celery_app # noqa + from apps.prometheus.reporter import MonitorReporter # noqa reporter = MonitorReporter( data_id=env.BKAPP_MONITOR_REPORTER_DATA_ID, # 监控 Data ID @@ -51,6 +63,8 @@ def monitor_report_config(): url=env.BKAPP_MONITOR_REPORTER_URL, # 上报地址 report_interval=env.BKAPP_MONITOR_REPORTER_REPORT_INTERVAL, # 上报周期,秒 chunk_size=env.BKAPP_MONITOR_REPORTER_CHUNK_SIZE, # 上报指标分块大小 + proc_type=proc_type, + instance_tmpl=instance_tmpl, ) # 针对多进程worker需要做特殊梳理,在worker进程中进行reporter start @@ -63,16 +77,43 @@ def monitor_report_config(): worker_process_init.connect(reporter.start, weak=False) + sys.stdout.write( + "[Monitor reporter] init success, proc_type -> %s, instance_tmpl -> %s \n" % (proc_type, instance_tmpl) + ) + sys.stdout.write("[Monitor reporter] init success\n") - elif ( - "gunicorn" in boot_cmd - or "runserver" in boot_cmd - or "sync_host_event" in boot_cmd - or "sync_host_relation_event" in boot_cmd - or "apply_resource_watched_events" in boot_cmd - ): - from bk_monitor_report import MonitorReporter # noqa + else: + from apps.prometheus.reporter import MonitorReporter # noqa + + match_proc_name = None + proc_names = [ + "gunicorn", + "runserver", + "sync_host_event", + "sync_host_relation_event", + "sync_process_event", + "apply_resource_watched_events", + ] + + for proc_name in proc_names: + if proc_name in boot_cmd: + match_proc_name = proc_name + break + + if not match_proc_name: + sys.stdout.write("[!]unknown boot cmd: %s, skip monitor report config\n" % boot_cmd) + return + else: + sys.stdout.write("[Monitor reporter] match_proc_name %s \n" % match_proc_name) + + if match_proc_name in ["gunicorn", "runserver"]: + proc_type = "web" + instance_tmpl = str(match_proc_name) + "@%h-%P" + else: + # 单进程运行,无需 pid + proc_type = "sync" + instance_tmpl = str(match_proc_name) + "@%h" reporter = MonitorReporter( data_id=env.BKAPP_MONITOR_REPORTER_DATA_ID, # 监控 Data ID @@ -81,10 +122,11 @@ def monitor_report_config(): url=env.BKAPP_MONITOR_REPORTER_URL, # 上报地址 report_interval=env.BKAPP_MONITOR_REPORTER_REPORT_INTERVAL, # 上报周期,秒 chunk_size=env.BKAPP_MONITOR_REPORTER_CHUNK_SIZE, # 上报指标分块大小 + proc_type=proc_type, + instance_tmpl=instance_tmpl, ) reporter.start() - sys.stdout.write("[Monitor reporter] init success \n") - - else: - sys.stdout.write("[!]unknown boot cmd: %s, skip monitor report config\n" % boot_cmd) + sys.stdout.write( + "[Monitor reporter] init success, proc_type -> %s, instance_tmpl -> %s \n" % (proc_type, instance_tmpl) + ) diff --git a/script_tools/setup_pagent2.py b/script_tools/setup_pagent2.py index 0f1627e74..17d768ec0 100644 --- a/script_tools/setup_pagent2.py +++ b/script_tools/setup_pagent2.py @@ -751,8 +751,8 @@ def _run( logger.logging("proxy", "Failed to get paramiko version", is_report=False, level=logging.WARNING) _app_core_remote_proxy_info_labels = { - "hostname": socket.gethostname(), - "ip": args.lan_eth_ip, + "proxy_name": socket.gethostname(), + "proxy_ip": args.lan_eth_ip, "bk_cloud_id": args.host_cloud, "paramiko_version": _paramiko_version, }