Skip to content

Commit

Permalink
feature: 增加埋点,Trace 接入 (#603)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhuoZhuoCrayon committed Aug 26, 2022
1 parent 3bab343 commit e75bf49
Show file tree
Hide file tree
Showing 11 changed files with 172 additions and 186 deletions.
148 changes: 0 additions & 148 deletions apps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,151 +8,3 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

import json
import threading
from typing import Collection

import MySQLdb
from django.conf import settings
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation import dbapi
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.instrumentation.django import DjangoInstrumentor
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.logging import LoggingInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.sampling import ALWAYS_OFF, DEFAULT_OFF
from opentelemetry.trace import Span, Status, StatusCode


def requests_callback(span: Span, response):
"""处理蓝鲸格式返回码"""
try:
json_result = response.json()
except Exception: # pylint: disable=broad-except
return
if not isinstance(json_result, dict):
return

# NOTE: esb got a result, but apigateway /iam backend / search-engine got not result
code = json_result.get("code", 0)
span.set_attribute("result_code", code)
span.set_attribute("result_message", json_result.get("message", ""))
span.set_attribute("result_errors", str(json_result.get("errors", "")))
try:
request_id = (
# new esb and apigateway
response.headers.get("x-bkapi-request-id")
# iam backend
or response.headers.get("x-request-id")
# old esb
or json_result.get("request_id", "")
)
if request_id:
span.set_attribute("bk.request_id", request_id)
except Exception: # pylint: disable=broad-except
pass

if code in [0, "0", "00"]:
span.set_status(Status(StatusCode.OK))
else:
span.set_status(Status(StatusCode.ERROR))


def django_response_hook(span, request, response):
if hasattr(response, "data"):
result = response.data
else:
try:
result = json.loads(response.content)
except Exception: # pylint: disable=broad-except
return
if not isinstance(result, dict):
return
span.set_attribute("result_code", result.get("code", 0))
span.set_attribute("result_message", result.get("message", ""))
span.set_attribute("result_errors", result.get("errors", ""))
result = result.get("result", True)
if result:
span.set_status(Status(StatusCode.OK))
return
span.set_status(Status(StatusCode.ERROR))


class LazyBatchSpanProcessor(BatchSpanProcessor):
def __init__(self, *args, **kwargs):
super(LazyBatchSpanProcessor, self).__init__(*args, **kwargs)
# 停止默认线程
self.done = True
with self.condition:
self.condition.notify_all()
self.worker_thread.join()
self.done = False
self.worker_thread = None

def on_end(self, span: ReadableSpan) -> None:
if self.worker_thread is None:
self.worker_thread = threading.Thread(target=self.worker, daemon=True)
self.worker_thread.start()
super(LazyBatchSpanProcessor, self).on_end(span)


class BluekingInstrumentor(BaseInstrumentor):
has_instrument = False

def _uninstrument(self, **kwargs):
pass

def _instrument(self, **kwargs):
"""Instrument the library"""
if self.has_instrument:
return
otlp_exporter = OTLPSpanExporter(endpoint=settings.OTLP_GRPC_HOST)
span_processor = LazyBatchSpanProcessor(otlp_exporter)

# periord task not sampler
sampler = DEFAULT_OFF
if settings.IS_CELERY_BEAT:
sampler = ALWAYS_OFF

tracer_provider = TracerProvider(
resource=Resource.create(
{
"service.name": settings.APP_CODE,
"bk_data_id": settings.OTLP_BK_DATA_ID,
}
),
sampler=sampler,
)

tracer_provider.add_span_processor(span_processor)
trace.set_tracer_provider(tracer_provider)
DjangoInstrumentor().instrument(response_hook=django_response_hook)
RedisInstrumentor().instrument()
# ElasticsearchInstrumentor().instrument()
RequestsInstrumentor().instrument(tracer_provider=tracer_provider, span_callback=requests_callback)
CeleryInstrumentor().instrument(tracer_provider=tracer_provider)
LoggingInstrumentor().instrument()
dbapi.wrap_connect(
__name__,
MySQLdb,
"connect",
"mysql",
{
"database": "db",
"port": "port",
"host": "host",
"user": "user",
},
tracer_provider=tracer_provider,
)
self.has_instrument = True

def instrumentation_dependencies(self) -> Collection[str]:
return []
2 changes: 1 addition & 1 deletion apps/backend/subscription/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def create_subscription(self, request):
node_type=scope["node_type"],
nodes=scope["nodes"],
target_hosts=params.get("target_hosts"),
from_system="blueking",
from_system=params["bk_app_code"] or "blueking",
enable=enable,
is_main=params.get("is_main", False),
creator=params["bk_username"],
Expand Down
3 changes: 0 additions & 3 deletions apps/node_man/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from django.conf import settings
from django.db import ProgrammingError, connection

from apps import BluekingInstrumentor
from common.log import logger


Expand All @@ -30,8 +29,6 @@ def ready(self):

from apps.node_man.models import GlobalSettings

if settings.ENABLE_OTEL_TRACE:
BluekingInstrumentor().instrument()
if GlobalSettings._meta.db_table not in connection.introspection.table_names():
# 初次部署表不存在时跳过DB写入操作
logger.info(f"{GlobalSettings._meta.db_table} not exists, skip fetch_esb_api_key before migrate.")
Expand Down
7 changes: 5 additions & 2 deletions apps/node_man/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@
QueryGlobalSettingsException,
UrlNotReachableError,
)
from apps.prometheus.models import export_job_prometheus_mixin
from apps.prometheus.models import (
export_job_prometheus_mixin,
export_subscription_prometheus_mixin,
)
from apps.utils import files, orm, translation
from common.log import logger
from pipeline.parser import PipelineParser
Expand Down Expand Up @@ -1787,7 +1790,7 @@ def __str__(self):
)


class Subscription(orm.SoftDeleteModel):
class Subscription(export_subscription_prometheus_mixin(), orm.SoftDeleteModel):
"""订阅"""

class ObjectType(object):
Expand Down
48 changes: 48 additions & 0 deletions apps/prometheus/middlewares.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available.
Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

from django.conf import settings
from django.utils.deprecation import MiddlewareMixin
from django_prometheus.conf import NAMESPACE
from django_prometheus.middleware import Metrics
from prometheus_client import Counter


class NodeManMetrics(Metrics):
def register(self):
self.requests_by_view_app_code = self.register_metric(
Counter,
"django_http_requests_by_view_app_code",
"Count of requests by view, app_code.",
["view", "app_code"],
namespace=NAMESPACE,
)


class NodeManAfterMiddleware(MiddlewareMixin):
metrics_cls = NodeManMetrics

def __init__(self, get_response=None):
super().__init__(get_response)
self.metrics = self.metrics_cls.get_instance()

def label_metric(self, metric, request, response=None, **labels):
return metric.labels(**labels) if labels else metric

def _app_code(self, request):
return request.META.get("HTTP_BK_APP_CODE", settings.APP_CODE)

def process_view(self, request, view_func, *view_args, **view_kwargs):
if hasattr(request, "resolver_match"):
name = request.resolver_match.view_name or "<unnamed view>"
self.label_metric(
self.metrics.requests_by_view_app_code, request, view=name, app_code=self._app_code(request)
).inc()
62 changes: 48 additions & 14 deletions apps/prometheus/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,23 @@
specific language governing permissions and limitations under the License.
"""

import typing

from django_prometheus.conf import NAMESPACE
from prometheus_client import Counter

from apps.node_man import constants
jobs_by_op_type_operate_step = Counter(
"django_app_jobs_total_by_operate_step",
"Count of jobs by operate, step.",
["operate", "step"],
namespace=NAMESPACE,
)


job_model_updates = Counter(
"django_job_model_updates_total",
"Number of update operations by model.",
["job_type", "status"],
subscriptions_by_object_node_category = Counter(
"django_app_subscriptions_total_by_object_node_category",
"Count of subscriptions by object, node, category.",
["object", "node", "category"],
namespace=NAMESPACE,
)

Expand All @@ -26,18 +34,44 @@ def export_job_prometheus_mixin():
"""任务模型埋点"""

class Mixin:
_original_status = None
status = None
job_type = None
job_type: str = None
task_id_list: typing.List[int] = None
_origin_task_id_list: typing.List[int] = None

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._origin_status = self.status
self._origin_task_id_list = self.task_id_list

def unpacking_job_type(self) -> typing.Tuple[str, str]:
if not self.job_type:
return "default", "default"
operate_untreated, step = self.job_type.rsplit("_", 1)
operate = operate_untreated.replace("MAIN_", "")
return operate, step

def inc(self):
operate, step = self.unpacking_job_type()
jobs_by_op_type_operate_step.labels(operate, step).inc()

def _do_insert(self, *args, **kwargs):
self.inc()
return super()._do_insert(*args, **kwargs)

return Mixin


def export_subscription_prometheus_mixin():
"""任务模型埋点"""

class Mixin:
object_type: typing.Optional[str] = None
node_type: typing.Optional[str] = None
category: typing.Optional[str] = None

def _do_update(self, *args, **kwargs):
# 任务状态有变更,且目标状态不是处理中的,则进行埋点
if self._origin_status != self.status and self.status not in constants.JobStatusType.PROCESSING_STATUS:
job_model_updates.labels(self.job_type, self.status).inc()
return super()._do_update(*args, **kwargs)
def _do_insert(self, *args, **kwargs):
subscriptions_by_object_node_category.labels(
self.object_type, self.node_type, self.category or "subscription"
).inc()
return super()._do_insert(*args, **kwargs)

return Mixin
22 changes: 15 additions & 7 deletions config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
"django_dbconn_retry",
# django_prometheus
"django_prometheus",
"blueapps.opentelemetry.instrument_app",
)

# 这里是默认的中间件,大部分情况下,不需要改动
Expand All @@ -86,7 +87,6 @@
# 蓝鲸静态资源服务
"whitenoise.middleware.WhiteNoiseMiddleware",
# Auth middleware
# 'blueapps.account.middlewares.BkJwtLoginRequiredMiddleware',
# 'blueapps.account.middlewares.WeixinLoginRequiredMiddleware',
"blueapps.account.middlewares.BkJwtLoginRequiredMiddleware",
"blueapps.account.middlewares.LoginRequiredMiddleware",
Expand All @@ -100,15 +100,23 @@

# 添加django_prometheus中间件
MIDDLEWARE = (
("django_prometheus.middleware.PrometheusBeforeMiddleware",)
("blueapps.opentelemetry.metrics.middlewares.SaaSMetricsBeforeMiddleware",)
+ MIDDLEWARE
+ ("django_prometheus.middleware.PrometheusAfterMiddleware",)
+ (
"blueapps.opentelemetry.metrics.middlewares.SaaSMetricsAfterMiddleware",
"apps.prometheus.middlewares.NodeManAfterMiddleware",
)
)

# open telemetry
ENABLE_OTEL_TRACE = get_type_env("ENABLE_OTEL_TRACE", _type=bool, default=False)
OTLP_GRPC_HOST = get_type_env("OTLP_GRPC_HOST", _type=str, default="")
OTLP_BK_DATA_ID = get_type_env("OTLP_BK_DATA_ID", _type=int, default=0)
# ===============================================================================
# 可观测
# ===============================================================================

ENABLE_OTEL_TRACE = env.BKAPP_ENABLE_OTEL_TRACE
BKAPP_OTEL_SAMPLER = env.BKAPP_OTEL_SAMPLER
BK_APP_OTEL_INSTRUMENT_DB_API = env.BKAPP_OTEL_INSTRUMENT_DB_API
BKAPP_OTEL_GRPC_HOST = env.BKAPP_OTEL_GRPC_HOST
BKAPP_OTEL_BK_DATA_TOKEN = env.BKAPP_OTEL_BK_DATA_TOKEN

# 单元测试豁免登录
if "test" in sys.argv:
Expand Down
Loading

0 comments on commit e75bf49

Please sign in to comment.