Skip to content

Commit

Permalink
feature: 可观测建设
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhuoZhuoCrayon committed Oct 8, 2023
1 parent 5d75e06 commit 12d638f
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 61 deletions.
12 changes: 4 additions & 8 deletions apps/backend/components/collections/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from apps.node_man import constants, models
from apps.prometheus import metrics
from apps.prometheus.helper import SetupObserve
from apps.utils import cache, local, time_handler, translation
from apps.utils import cache, time_handler, translation
from apps.utils.exc import ExceptionHandler
from pipeline.core.flow import Service

Expand Down Expand Up @@ -94,9 +94,7 @@ def service_run_exc_handler(
sub_inst_ids = instance.get_subscription_instance_ids(data)
code = instance.__class__.__name__

metrics.app_task_engine_service_run_exceptions_total.labels(
hostname=local.get_hostname(), code=code, **parse_exception(exc)
).inc()
metrics.app_task_engine_service_run_exceptions_total.labels(code=code, **parse_exception(exc)).inc()

logger.exception(f"[task_engine][service_run_exc_handler:{code}] act_name -> {act_name}, exc -> {str(exc)}")

Expand Down Expand Up @@ -128,7 +126,7 @@ def get_language_func(
def get_labels_func(
wrapped: Callable, instance: "BaseService", args: Tuple[Any], kwargs: Dict[str, Any]
) -> typing.Dict[str, str]:
return {"code": instance.__class__.__name__, "hostname": local.get_hostname()}
return {"code": instance.__class__.__name__}


class LogMixin:
Expand Down Expand Up @@ -274,9 +272,7 @@ def bulk_set_sub_inst_status(self, status: str, sub_inst_ids: Union[List[int], S
models.SubscriptionInstanceRecord.objects.filter(id__in=sub_inst_ids).update(
status=status, update_time=timezone.now()
)
metrics.app_task_engine_sub_inst_statuses_total.labels(hostname=local.get_hostname(), status=status).inc(
len(sub_inst_ids)
)
metrics.app_task_engine_sub_inst_statuses_total.labels(status=status).inc(len(sub_inst_ids))
if status in [constants.JobStatusType.FAILED]:
self.sub_inst_failed_handler(sub_inst_ids)

Expand Down
9 changes: 4 additions & 5 deletions apps/backend/subscription/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from apps.node_man import tools as node_man_tools
from apps.node_man.handlers.cmdb import CmdbHandler
from apps.prometheus import metrics
from apps.utils import local, translation
from apps.utils import translation
from pipeline import builder
from pipeline.builder import Data, NodeOutput, ServiceActivity, Var
from pipeline.core.pipeline import Pipeline
Expand Down Expand Up @@ -319,7 +319,6 @@ def create_task(
if instance_info["host"].get("bk_host_id")
}

hostname = local.get_hostname()
# 前置错误需要跳过的主机,不创建订阅任务实例
error_hosts = []
# 批量创建订阅实例执行记录
Expand Down Expand Up @@ -462,9 +461,9 @@ def create_task(
)
)

metrics.app_task_engine_sub_inst_statuses_total.labels(
hostname=hostname, status=constants.JobStatusType.PENDING
).inc(amount=len(created_instance_records))
metrics.app_task_engine_sub_inst_statuses_total.labels(status=constants.JobStatusType.PENDING).inc(
amount=len(created_instance_records)
)

task_host_limit = models.GlobalSettings.get_config(
models.GlobalSettings.KeyEnum.TASK_HOST_LIMIT.value, default=TASK_HOST_LIMIT
Expand Down
18 changes: 9 additions & 9 deletions apps/backend/subscription/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,26 +351,26 @@ def get_service_instance_by_ids(bk_biz_id, ids):
return result


@FuncCacheDecorator(cache_time=5 * constants.TimeUnit.MINUTE)
def fetch_biz_info_map(fields: typing.Optional[typing.List[str]] = None) -> typing.Dict[int, typing.Dict]:
@FuncCacheDecorator(cache_time=1 * constants.TimeUnit.MINUTE)
def fetch_biz_info_map(fields: typing.Optional[typing.List[str]] = None) -> typing.Dict[str, typing.Dict]:
"""
查询所有业务
:return: 主机业务关系列表
"""
fields = fields or ["bk_biz_id", "bk_biz_name"]
biz_infos: typing.List[typing.Dict] = batch_request(client_v2.cc.search_business, {"fields": fields})
biz_infos.append({"bk_biz_id": settings.BK_CMDB_RESOURCE_POOL_BIZ_ID, "bk_biz_name": "资源池"})
biz_infos.append({"bk_biz_id": str(settings.BK_CMDB_RESOURCE_POOL_BIZ_ID), "bk_biz_name": "资源池"})

biz_info_map: typing.Dict[int, typing.Dict] = {biz_info["bk_biz_id"]: biz_info for biz_info in biz_infos}
biz_info_map: typing.Dict[str, typing.Dict] = {str(biz_info["bk_biz_id"]): biz_info for biz_info in biz_infos}
logger.info("[fetch_biz_info_map] fields -> %s, count -> %s", pprint.pformat(fields), len(biz_infos))
return biz_info_map


def fetch_biz_info(bk_biz_ids: typing.List[int]) -> typing.Dict[int, typing.Dict]:
biz_info_map: typing.Dict[int, typing.Dict] = fetch_biz_info_map(get_cache=True)
biz_info_map: typing.Dict[str, typing.Dict] = fetch_biz_info_map(get_cache=True)
if not biz_info_map:
logger.error("[fetch_biz_info] biz_info_map is empty !")
return {bk_biz_id: biz_info_map.get(bk_biz_id) or {} for bk_biz_id in bk_biz_ids}
return {bk_biz_id: biz_info_map.get(str(bk_biz_id)) or {} for bk_biz_id in bk_biz_ids}


def get_host_detail_by_template(bk_obj_id, template_info_list: list, bk_biz_id: int = None):
Expand Down Expand Up @@ -401,15 +401,15 @@ def get_host_detail_by_template(bk_obj_id, template_info_list: list, bk_biz_id:
call_func, dict(bk_set_template_ids=template_ids, bk_biz_id=bk_biz_id, fields=fields)
)
biz_info = fetch_biz_info([bk_biz_id])
cloud_id_name_map = models.Cloud.cloud_id_name_map()
cloud_id_name_map = models.Cloud.cloud_id_name_map(get_cache=True)

if not biz_info[bk_biz_id]:
logger.warning("[get_host_detail_by_template] can not find biz_info -> %s", bk_biz_id)

for host in host_info_result:
host["bk_biz_id"] = bk_biz_id
host["bk_biz_name"] = host["bk_biz_name"] = biz_info[bk_biz_id].get("bk_biz_name")
host["bk_cloud_name"] = cloud_id_name_map.get(host["bk_cloud_id"])
host["bk_cloud_name"] = cloud_id_name_map.get(str(host["bk_cloud_id"]))

return host_info_result

Expand Down Expand Up @@ -564,7 +564,7 @@ def get_host_detail(host_info_list: list, bk_biz_id: int = None):
logger.warning("[get_host_detail] can not find biz_info -> %s", _host["bk_biz_id"])

_host["bk_cloud_name"] = (
cloud_id_name_map.get(_host["bk_cloud_id"], "")
cloud_id_name_map.get(str(_host["bk_cloud_id"]), "")
if _host["bk_cloud_id"] != constants.DEFAULT_CLOUD
else "直连区域"
)
Expand Down
4 changes: 2 additions & 2 deletions apps/iam/handlers/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ def create_instance(cls, instance_id: str, attribute=None) -> Resource:

@classmethod
def create_instances(cls, instance_ids: Union[List[str], Set[str]], attribute=None) -> List[Resource]:
cloud_id_name_map = models.Cloud.cloud_id_name_map()
cloud_id_name_map = models.Cloud.cloud_id_name_map(get_cache=True)
return [
cls.create_instance(instance_id, {"name": cloud_id_name_map.get(int(instance_id))})
cls.create_instance(instance_id, {"name": cloud_id_name_map.get(str(instance_id))})
for instance_id in instance_ids
]

Expand Down
4 changes: 2 additions & 2 deletions apps/node_man/handlers/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -846,12 +846,12 @@ def retrieve(self, params: Dict[str, Any]):
host_execute_status_list.extend(filter_hosts)

# 补充业务名、管控区域名称
cloud_id_name_map = models.Cloud.cloud_id_name_map()
cloud_id_name_map = models.Cloud.cloud_id_name_map(get_cache=True)
biz_name_map = CmdbHandler.biz_id_name_without_permission()
for host_execute_status in host_execute_status_list:
host_execute_status.update(
bk_biz_name=biz_name_map.get(host_execute_status.get("bk_biz_id")),
bk_cloud_name=cloud_id_name_map.get(host_execute_status["bk_cloud_id"]),
bk_cloud_name=cloud_id_name_map.get(str(host_execute_status["bk_cloud_id"])),
)

tools.JobTools.update_job_statistics(self.data, task_result["status_counter"])
Expand Down
4 changes: 2 additions & 2 deletions apps/node_man/handlers/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,15 +509,15 @@ def migrate_preview(cls, query_params: Dict[str, Any]) -> List[Dict[str, Any]]:
)

# 补充业务名、管控区域名称
cloud_id_name_map = models.Cloud.cloud_id_name_map()
cloud_id_name_map = models.Cloud.cloud_id_name_map(get_cache=True)
biz_name_map = CmdbHandler.biz_id_name_without_permission()

results = []
for action_id, instances in action_instance_map.items():
for instance in instances:
instance.update(
bk_biz_name=biz_name_map.get(instance.get("bk_biz_id")),
bk_cloud_name=cloud_id_name_map.get(instance["bk_cloud_id"]),
bk_cloud_name=cloud_id_name_map.get(str(instance["bk_cloud_id"])),
)

inst_job_type = constants.ACTION_NAME_JOB_TYPE_MAP.get(action_id)
Expand Down
7 changes: 4 additions & 3 deletions apps/node_man/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,11 +719,12 @@ class Cloud(models.Model):

@classmethod
@FuncCacheDecorator(cache_time=20 * constants.TimeUnit.SECOND)
def cloud_id_name_map(cls) -> Dict:
def cloud_id_name_map(cls) -> Dict[str, str]:
all_cloud_map = {
cloud.bk_cloud_id: cloud.bk_cloud_name for cloud in cls.objects.all().only("bk_cloud_id", "bk_cloud_name")
str(cloud.bk_cloud_id): cloud.bk_cloud_name
for cloud in cls.objects.all().only("bk_cloud_id", "bk_cloud_name")
}
all_cloud_map[constants.DEFAULT_CLOUD] = str(_("直连区域"))
all_cloud_map[str(constants.DEFAULT_CLOUD)] = str(_("直连区域"))
return all_cloud_map

@classmethod
Expand Down
29 changes: 14 additions & 15 deletions apps/prometheus/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,41 +103,40 @@ def get_histogram_buckets_from_env(env_name):

app_task_engine_running_executes_info = Gauge(
name="app_task_engine_running_executes",
documentation="Number of engine running executes per hostname, per code.",
labelnames=["hostname", "code"],
documentation="Number of engine running executes per code.",
labelnames=["code"],
)

app_task_engine_running_schedules_info = Gauge(
name="app_task_engine_running_schedules",
documentation="Number of engine running schedules per hostname, per code.",
labelnames=["hostname", "code"],
documentation="Number of engine running schedules per code.",
labelnames=["code"],
)

app_task_engine_execute_duration_seconds = Histogram(
name="app_task_engine_execute_duration_seconds",
documentation="Histogram of the time (in seconds) each engine execute per hostname, per code.",
documentation="Histogram of the time (in seconds) each engine execute per code.",
buckets=get_histogram_buckets_from_env("BKAPP_MONITOR_METRICS_ENGINE_BUCKETS"),
labelnames=["hostname", "code"],
labelnames=["code"],
)

app_task_engine_schedule_duration_seconds = Histogram(
name="app_task_engine_schedule_duration_seconds",
documentation="Histogram of the time (in seconds) each engine schedule per hostname, per code.",
documentation="Histogram of the time (in seconds) each engine schedule per code.",
buckets=get_histogram_buckets_from_env("BKAPP_MONITOR_METRICS_ENGINE_BUCKETS"),
labelnames=["hostname", "code"],
labelnames=["code"],
)

app_task_engine_service_run_exceptions_total = Counter(
name="app_task_engine_service_run_exceptions_total",
documentation="Cumulative count of engine service run exceptions "
"per hostname, per code, per exc_type, per exc_code.",
labelnames=["hostname", "code", "exc_type", "exc_code"],
documentation="Cumulative count of engine service run exceptions " "per code, per exc_type, per exc_code.",
labelnames=["code", "exc_type", "exc_code"],
)

app_task_engine_sub_inst_statuses_total = Counter(
name="app_task_engine_sub_inst_statuses_total",
documentation="Cumulative count of engine subscription instance statuses per hostname, per status .",
labelnames=["hostname", "status"],
documentation="Cumulative count of engine subscription instance statuses per status .",
labelnames=["status"],
)

app_task_engine_get_common_data_duration_seconds = Histogram(
Expand Down Expand Up @@ -197,8 +196,8 @@ def get_histogram_buckets_from_env(env_name):

app_core_remote_proxy_info = Gauge(
name="app_core_remote_proxy_info",
documentation="A metric with a constants '1' value labeled by hostname, ip, bk_cloud_id, paramiko_version",
labelnames=["hostname", "ip", "bk_cloud_id", "paramiko_version"],
documentation="A metric with a constants '1' value labeled by proxy_name, proxy_ip, bk_cloud_id, paramiko_version",
labelnames=["proxy_name", "proxy_ip", "bk_cloud_id", "paramiko_version"],
)

app_core_cache_decorator_requests_total = Counter(
Expand Down
75 changes: 75 additions & 0 deletions apps/prometheus/reporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available.
Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import logging
import os
import time
from typing import Dict, Optional

from bk_monitor_report import MonitorReporter as Reporter
from celery.utils.nodenames import gethostname, host_format
from prometheus_client import REGISTRY, CollectorRegistry, generate_latest
from prometheus_client.parser import text_string_to_metric_families

logger = logging.getLogger("bk-monitor-report")


class MonitorReporter(Reporter):
def __init__(
self,
data_id: int,
access_token: str,
target: str,
url: str,
report_interval: int = 60,
chunk_size: int = 500,
registry: Optional[CollectorRegistry] = REGISTRY,
proc_type: str = "",
instance_tmpl: str = "",
):
super().__init__(data_id, access_token, target, url, report_interval, chunk_size, registry)
self.proc_type = proc_type
self.instance_tmpl = instance_tmpl

def generate_addition_labels(self) -> Dict[str, str]:
addition_labels: Dict[str, str] = {"hostname": gethostname()}
if self.proc_type == "celery":
addition_labels["instance"] = host_format(self.instance_tmpl)
else:
addition_labels["instance"] = host_format(self.instance_tmpl, P=str(os.getpid()))
return addition_labels

def generate_chunked_report_data(self):
timestamp = round(time.time() * 1000)

addition_labels = self.generate_addition_labels()
data = {"data_id": self.data_id, "access_token": self.access_token, "data": []}
size = 0
metrics_text = generate_latest(self.registry).decode("utf-8")
for family in text_string_to_metric_families(metrics_text):
for sample in family.samples:
labels = sample.labels or {}
# 补充维度
labels.update(addition_labels)
data["data"].append(
{
"metrics": {sample.name: sample.value},
"target": self.target,
"dimension": labels,
"timestamp": timestamp,
}
)
size += 1
if size % self.chunk_size == 0:
yield data
data = {"data_id": self.data_id, "access_token": self.access_token, "data": []}

if data["data"]:
yield data
Loading

0 comments on commit 12d638f

Please sign in to comment.