From ea5fbff70023ec416b2afa112047af1ca61fa1fa Mon Sep 17 00:00:00 2001 From: liujun Date: Fri, 14 Jun 2024 19:24:12 +0800 Subject: [PATCH 1/3] =?UTF-8?q?feat:=20=E7=B3=BB=E7=BB=9F=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E6=89=A7=E8=A1=8C=E7=8A=B6=E6=80=81=E8=87=AA=E7=9B=91?= =?UTF-8?q?=E6=8E=A7=E8=83=BD=E5=8A=9B=E6=94=AF=E6=8C=81=20#7457?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/default.py | 1 + config/urls_custom.py | 10 +- gcloud/contrib/monitor/__init__.py | 14 +++ gcloud/contrib/monitor/apps.py | 19 ++++ gcloud/contrib/monitor/urls.py | 22 ++++ gcloud/contrib/monitor/views.py | 156 +++++++++++++++++++++++++++++ 6 files changed, 217 insertions(+), 5 deletions(-) create mode 100755 gcloud/contrib/monitor/__init__.py create mode 100644 gcloud/contrib/monitor/apps.py create mode 100644 gcloud/contrib/monitor/urls.py create mode 100644 gcloud/contrib/monitor/views.py diff --git a/config/default.py b/config/default.py index 5c40605c0a..03d7703df1 100644 --- a/config/default.py +++ b/config/default.py @@ -115,6 +115,7 @@ "blueapps.opentelemetry.instrument_app", "apigw_manager.apigw", "bk_notice_sdk", + "gcloud.contrib.monitor", ) # 这里是默认的中间件,大部分情况下,不需要改动 diff --git a/config/urls_custom.py b/config/urls_custom.py index 6ed3c5ecef..e0b0ceb409 100644 --- a/config/urls_custom.py +++ b/config/urls_custom.py @@ -10,12 +10,11 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -from rest_framework import permissions -from drf_yasg.views import get_schema_view -from drf_yasg import openapi -from django.conf.urls import include, url from django.conf import settings - +from django.conf.urls import include, url +from drf_yasg import openapi +from drf_yasg.views import get_schema_view +from rest_framework import permissions # 用户自定义 urlconf urlpatterns_custom = [ @@ -41,6 +40,7 @@ url(r"^plugin_service/", include("plugin_service.urls")), url(r"^mako_operations/", include("gcloud.mako_template_helper.urls")), url(r"^engine_admin/", include("pipeline.contrib.engine_admin.urls")), + url(r"^monitor/", include("gcloud.contrib.monitor.urls")), ] schema_view = get_schema_view( diff --git a/gcloud/contrib/monitor/__init__.py b/gcloud/contrib/monitor/__init__.py new file mode 100755 index 0000000000..b0c2741ac8 --- /dev/null +++ b/gcloud/contrib/monitor/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +default_app_config = "gcloud.contrib.monitor.apps.MonitorConfig" diff --git a/gcloud/contrib/monitor/apps.py b/gcloud/contrib/monitor/apps.py new file mode 100644 index 0000000000..77bbda7575 --- /dev/null +++ b/gcloud/contrib/monitor/apps.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +from django.apps import AppConfig + + +class MonitorConfig(AppConfig): + name = "gcloud.contrib.monitor" + verbose_name = "GcloudContribMonitor" diff --git a/gcloud/contrib/monitor/urls.py b/gcloud/contrib/monitor/urls.py new file mode 100644 index 0000000000..4ebebef3fe --- /dev/null +++ b/gcloud/contrib/monitor/urls.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +from django.conf.urls import url + +from gcloud.contrib.monitor import views + +urlpatterns = [ + url(r"^get_failed_task/$", views.get_failed_task), + url(r"^get_executing_task/$", views.get_executing_task), + url(r"^get_schedule_times/$", views.get_schedule_times), +] diff --git a/gcloud/contrib/monitor/views.py b/gcloud/contrib/monitor/views.py new file mode 100644 index 0000000000..a98a8a1462 --- /dev/null +++ b/gcloud/contrib/monitor/views.py @@ -0,0 +1,156 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +from django.db import connection +from django.http import JsonResponse +from django.views.decorators.http import require_GET + +from gcloud.iam_auth.intercept import iam_intercept +from gcloud.iam_auth.view_interceptors.statistics import StatisticsViewInpterceptor + + +@require_GET +@iam_intercept(StatisticsViewInpterceptor()) +def get_failed_task(request): + """ + 获取失败任务 + """ + limit = int(request.GET.get("limit", 100)) + offset = int(request.GET.get("offset", 0)) + failed_sql = f"""SELECT + tt.id AS task_id, + cp.NAME AS project_name, + pp.NAME AS task_name + FROM + `taskflow3_taskflowinstance` AS tt, + `core_project` AS cp, + `pipeline_pipelineinstance` AS pp, + `eri_state` AS es + WHERE + pp.instance_id = es.root_id + AND tt.pipeline_instance_id = pp.id + AND tt.project_id = cp.id + AND pp.is_deleted = 0 + AND pp.is_expired = 0 + AND pp.is_finished = 0 + AND pp.is_revoked = 0 + AND pp.is_started = 1 + AND es.NAME = "FAILED" + ORDER BY + pp.id DESC + LIMIT + {offset},{limit}""" + with connection.cursor() as cursor: + cursor.execute(failed_sql) + failed_tasks = [ + {"task_id": item[0], "project_name": item[1], "task_name": item[2]} for item in cursor.fetchall() + ] + return JsonResponse({"result": True, "data": failed_tasks}) + + +@require_GET +@iam_intercept(StatisticsViewInpterceptor()) +def get_executing_task(request): + """ + 获取执行中任务 + """ + limit = int(request.GET.get("limit", 100)) + offset = int(request.GET.get("offset", 0)) + + def get_data(offset, limit): + failed_sql = f"""SELECT + pp.id + FROM + `pipeline_pipelineinstance` AS pp, + `eri_state` AS es + WHERE + pp.instance_id = es.root_id + AND pp.is_deleted = 0 + AND pp.is_expired = 0 + AND pp.is_finished = 0 + AND pp.is_revoked = 0 + AND pp.is_started = 1 + AND es.NAME = "FAILED" + ORDER BY + pp.id DESC + LIMIT + {offset}, {limit}""" + + with connection.cursor() as cursor: + cursor.execute(failed_sql) + failed_task_ids = [item[0] for item in cursor.fetchall()] + no_failed_sql = f"""SELECT + pp.id, + tt.id AS task_id, + cp.NAME AS project_name, + pp.NAME AS task_name + FROM + `taskflow3_taskflowinstance` AS tt, + `core_project` AS cp, + `pipeline_pipelineinstance` AS pp + WHERE + tt.pipeline_instance_id = pp.id + AND tt.project_id = cp.id + AND pp.is_deleted = 0 + AND pp.is_expired = 0 + AND pp.is_finished = 0 + AND pp.is_revoked = 0 + AND pp.is_started = 1 + ORDER BY + pp.id DESC + LIMIT + {offset}, {limit}""" + with connection.cursor() as cursor: + cursor.execute(no_failed_sql) + no_failed_tasks = [ + {"task_id": item[1], "project_name": item[2], "task_name": item[3]} + for item in cursor.fetchall() + if item[0] not in failed_task_ids + ] + return no_failed_tasks + + no_failed_tasks = [] + for i in range(offset, 5 * limit + offset, limit): + if len(no_failed_tasks) < limit: + no_failed_tasks.extend(get_data(i, limit)) + return JsonResponse({"result": True, "data": no_failed_tasks}) + + +@require_GET +@iam_intercept(StatisticsViewInpterceptor()) +def get_schedule_times(request): + """ + 获取调度次数 + """ + limit = int(request.GET.get("limit", 100)) + offset = int(request.GET.get("offset", 0)) + schedule_times_sql = f"""SELECT + pp.id, + pp.creator, + esc.schedule_times + FROM + eri_schedule AS esc, + eri_state AS es, + pipeline_pipelineinstance AS pp + WHERE + esc.node_id = es.node_id + AND es.root_id = pp.instance_id + AND esc.scheduling = 0 + ORDER BY + esc.schedule_times DESC + LIMIT + {offset},{limit}""" + with connection.cursor() as cursor: + cursor.execute(schedule_times_sql) + schedule_times = [{"id": item[0], "creator": item[1], "schedule_times": item[2]} for item in cursor.fetchall()] + return JsonResponse({"result": True, "data": schedule_times}) From 133a47f698c4fe078d87eb04e76142ef29b8352f Mon Sep 17 00:00:00 2001 From: liujun Date: Fri, 28 Jun 2024 11:11:27 +0800 Subject: [PATCH 2/3] =?UTF-8?q?feat:=20=E7=B3=BB=E7=BB=9F=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E6=89=A7=E8=A1=8C=E7=8A=B6=E6=80=81=E8=87=AA=E7=9B=91?= =?UTF-8?q?=E6=8E=A7=E8=83=BD=E5=8A=9B=E6=94=AF=E6=8C=81=20#7457?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gcloud/contrib/monitor/views.py | 207 ++++++++++++++++---------------- 1 file changed, 102 insertions(+), 105 deletions(-) diff --git a/gcloud/contrib/monitor/views.py b/gcloud/contrib/monitor/views.py index a98a8a1462..06f5379dc8 100644 --- a/gcloud/contrib/monitor/views.py +++ b/gcloud/contrib/monitor/views.py @@ -11,12 +11,15 @@ specific language governing permissions and limitations under the License. """ -from django.db import connection +from django.db.models import Q from django.http import JsonResponse +from django.utils import timezone from django.views.decorators.http import require_GET +from pipeline.eri.models import Schedule, State from gcloud.iam_auth.intercept import iam_intercept from gcloud.iam_auth.view_interceptors.statistics import StatisticsViewInpterceptor +from gcloud.taskflow3.models import TaskFlowInstance @require_GET @@ -27,34 +30,30 @@ def get_failed_task(request): """ limit = int(request.GET.get("limit", 100)) offset = int(request.GET.get("offset", 0)) - failed_sql = f"""SELECT - tt.id AS task_id, - cp.NAME AS project_name, - pp.NAME AS task_name - FROM - `taskflow3_taskflowinstance` AS tt, - `core_project` AS cp, - `pipeline_pipelineinstance` AS pp, - `eri_state` AS es - WHERE - pp.instance_id = es.root_id - AND tt.pipeline_instance_id = pp.id - AND tt.project_id = cp.id - AND pp.is_deleted = 0 - AND pp.is_expired = 0 - AND pp.is_finished = 0 - AND pp.is_revoked = 0 - AND pp.is_started = 1 - AND es.NAME = "FAILED" - ORDER BY - pp.id DESC - LIMIT - {offset},{limit}""" - with connection.cursor() as cursor: - cursor.execute(failed_sql) - failed_tasks = [ - {"task_id": item[0], "project_name": item[1], "task_name": item[2]} for item in cursor.fetchall() - ] + st = timezone.now() - timezone.timedelta(days=30) + start_time = request.GET.get("start_time", st) + states = State.objects.filter(name="FAILED", started_time__gte=start_time).values("root_id") + root_ids = [state["root_id"] for state in states] + tasks = ( + TaskFlowInstance.objects.select_related("project", "pipeline_instance") + .filter( + pipeline_instance__is_deleted=False, + pipeline_instance__is_expired=False, + pipeline_instance__is_finished=False, + pipeline_instance__is_revoked=False, + pipeline_instance__is_started=True, + pipeline_instance__instance_id__in=root_ids, + ) + .values("id", "project__name", "pipeline_instance__name")[offset : limit + offset] + ) + failed_tasks = [ + { + "task_id": task["id"], + "project_name": task["project__name"], + "task_name": task["pipeline_instance__name"], + } + for task in tasks + ] return JsonResponse({"result": True, "data": failed_tasks}) @@ -66,64 +65,53 @@ def get_executing_task(request): """ limit = int(request.GET.get("limit", 100)) offset = int(request.GET.get("offset", 0)) + st = timezone.now() - timezone.timedelta(days=30) + start_time = request.GET.get("start_time", st) + failed_states = State.objects.filter(name="FAILED", started_time__gte=start_time).values("root_id") + failed_root_ids = [state["root_id"] for state in failed_states] + # 失败的任务 + failed_tasks = ( + TaskFlowInstance.objects.select_related("project", "pipeline_instance") + .filter( + pipeline_instance__is_deleted=False, + pipeline_instance__is_expired=False, + pipeline_instance__is_finished=False, + pipeline_instance__is_revoked=False, + pipeline_instance__is_started=True, + pipeline_instance__instance_id__in=failed_root_ids, + ) + .values( + "pipeline_instance__id", + )[offset : limit + offset] + ) + failed_task_ids = [task["pipeline_instance__id"] for task in failed_tasks] - def get_data(offset, limit): - failed_sql = f"""SELECT - pp.id - FROM - `pipeline_pipelineinstance` AS pp, - `eri_state` AS es - WHERE - pp.instance_id = es.root_id - AND pp.is_deleted = 0 - AND pp.is_expired = 0 - AND pp.is_finished = 0 - AND pp.is_revoked = 0 - AND pp.is_started = 1 - AND es.NAME = "FAILED" - ORDER BY - pp.id DESC - LIMIT - {offset}, {limit}""" - - with connection.cursor() as cursor: - cursor.execute(failed_sql) - failed_task_ids = [item[0] for item in cursor.fetchall()] - no_failed_sql = f"""SELECT - pp.id, - tt.id AS task_id, - cp.NAME AS project_name, - pp.NAME AS task_name - FROM - `taskflow3_taskflowinstance` AS tt, - `core_project` AS cp, - `pipeline_pipelineinstance` AS pp - WHERE - tt.pipeline_instance_id = pp.id - AND tt.project_id = cp.id - AND pp.is_deleted = 0 - AND pp.is_expired = 0 - AND pp.is_finished = 0 - AND pp.is_revoked = 0 - AND pp.is_started = 1 - ORDER BY - pp.id DESC - LIMIT - {offset}, {limit}""" - with connection.cursor() as cursor: - cursor.execute(no_failed_sql) - no_failed_tasks = [ - {"task_id": item[1], "project_name": item[2], "task_name": item[3]} - for item in cursor.fetchall() - if item[0] not in failed_task_ids - ] - return no_failed_tasks - - no_failed_tasks = [] - for i in range(offset, 5 * limit + offset, limit): - if len(no_failed_tasks) < limit: - no_failed_tasks.extend(get_data(i, limit)) - return JsonResponse({"result": True, "data": no_failed_tasks}) + states = State.objects.filter(~Q(name="FAILED")).filter(started_time__gte=start_time).values("root_id") + root_ids = [state["root_id"] for state in states] + # 非失败的任务 + tasks = ( + TaskFlowInstance.objects.select_related("project", "pipeline_instance") + .filter( + pipeline_instance__is_deleted=False, + pipeline_instance__is_expired=False, + pipeline_instance__is_finished=False, + pipeline_instance__is_revoked=False, + pipeline_instance__is_started=True, + pipeline_instance__instance_id__in=root_ids, + ) + .values("id", "project__name", "pipeline_instance__name", "pipeline_instance__id")[offset : limit + offset] + ) + # 求差获得执行中的任务 + executing_tasks = [ + { + "task_id": task["id"], + "project_name": task["project__name"], + "task_name": task["pipeline_instance__name"], + } + for task in tasks + if task["pipeline_instance__id"] not in failed_task_ids + ] + return JsonResponse({"result": True, "data": executing_tasks}) @require_GET @@ -134,23 +122,32 @@ def get_schedule_times(request): """ limit = int(request.GET.get("limit", 100)) offset = int(request.GET.get("offset", 0)) - schedule_times_sql = f"""SELECT - pp.id, - pp.creator, - esc.schedule_times - FROM - eri_schedule AS esc, - eri_state AS es, - pipeline_pipelineinstance AS pp - WHERE - esc.node_id = es.node_id - AND es.root_id = pp.instance_id - AND esc.scheduling = 0 - ORDER BY - esc.schedule_times DESC - LIMIT - {offset},{limit}""" - with connection.cursor() as cursor: - cursor.execute(schedule_times_sql) - schedule_times = [{"id": item[0], "creator": item[1], "schedule_times": item[2]} for item in cursor.fetchall()] + st = timezone.now() - timezone.timedelta(days=30) + start_time = request.GET.get("start_time", st) + schedules = Schedule.objects.filter(scheduling=False).values("node_id", "schedule_times") + schedules = {schedule["node_id"]: schedule["schedule_times"] for schedule in schedules} + states = State.objects.filter(started_time__gte=start_time, node_id__in=list(schedules.keys())).values( + "node_id", "root_id" + ) + root_ids = {state["root_id"]: schedules[state["node_id"]] for state in states} + tasks = ( + TaskFlowInstance.objects.select_related("project", "pipeline_instance") + .filter(pipeline_instance__instance_id__in=list(root_ids.keys())) + .values( + "id", + "project__name", + "pipeline_instance__name", + "pipeline_instance__creator", + "pipeline_instance__instance_id", + )[offset : offset + limit] + ) + schedule_times = [ + { + "id": task["id"], + "project_name": task["project__name"], + "creator": task["pipeline_instance__name"], + "schedule_times": root_ids[task["pipeline_instance__instance_id"]], + } + for task in tasks + ] return JsonResponse({"result": True, "data": schedule_times}) From dead6b4197ebae6ead51e01b07188ede333b475c Mon Sep 17 00:00:00 2001 From: lTimej Date: Fri, 5 Jul 2024 11:35:31 +0800 Subject: [PATCH 3/3] =?UTF-8?q?feat:=20=E7=B3=BB=E7=BB=9F=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E6=89=A7=E8=A1=8C=E7=8A=B6=E6=80=81=E8=87=AA=E7=9B=91?= =?UTF-8?q?=E6=8E=A7=E8=83=BD=E5=8A=9B=E6=94=AF=E6=8C=81=20#7457?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gcloud/contrib/monitor/urls.py | 2 ++ gcloud/contrib/monitor/views.py | 45 +++++++++++++++++++++++++++++++++ requirements.txt | 2 ++ 3 files changed, 49 insertions(+) diff --git a/gcloud/contrib/monitor/urls.py b/gcloud/contrib/monitor/urls.py index 4ebebef3fe..493ba34bf3 100644 --- a/gcloud/contrib/monitor/urls.py +++ b/gcloud/contrib/monitor/urls.py @@ -19,4 +19,6 @@ url(r"^get_failed_task/$", views.get_failed_task), url(r"^get_executing_task/$", views.get_executing_task), url(r"^get_schedule_times/$", views.get_schedule_times), + url(r"^get_mq_overview/$", views.get_mq_overview), + url(r"^get_mq_data/$", views.get_mq_data), ] diff --git a/gcloud/contrib/monitor/views.py b/gcloud/contrib/monitor/views.py index 06f5379dc8..66d3d7fb63 100644 --- a/gcloud/contrib/monitor/views.py +++ b/gcloud/contrib/monitor/views.py @@ -11,6 +11,7 @@ specific language governing permissions and limitations under the License. """ +import pyrabbit2 from django.db.models import Q from django.http import JsonResponse from django.utils import timezone @@ -151,3 +152,47 @@ def get_schedule_times(request): for task in tasks ] return JsonResponse({"result": True, "data": schedule_times}) + + +@require_GET +@iam_intercept(StatisticsViewInpterceptor()) +def get_mq_overview(request): + """ + 获取mq总览 + """ + data = {} + cl = pyrabbit2.Client("localhost:15672", "guest", "guest") + overview = cl.get_overview() + data = { + "totals": { + "ready": overview["queue_totals"]["messages_ready"], + "unacked": overview["queue_totals"]["messages_unacknowledged"], + "total": overview["queue_totals"]["messages"], + }, + "global_totals": overview["object_totals"], + "nodes": cl.get_nodes(), + } + return JsonResponse({"result": True, "data": data}) + + +@require_GET +@iam_intercept(StatisticsViewInpterceptor()) +def get_mq_data(request): + """ + 获取mq数据 + """ + cl = pyrabbit2.Client("localhost:15672", "guest", "guest") + data = { + vhost: [ + { + "vhost": vhost, + "queue_name": queue["name"], + "message_count": queue["messages"], + "queue_state": queue["state"], + "messages": cl.get_messages(vhost, queue["name"], count=queue["messages"], requeue=True), + } + for queue in cl.get_queues(vhost=vhost) + ] + for vhost in cl.get_vhost_names() + } + return JsonResponse({"result": True, "data": data}) diff --git a/requirements.txt b/requirements.txt index 7f482bc6eb..47a0f41419 100644 --- a/requirements.txt +++ b/requirements.txt @@ -36,6 +36,8 @@ bkstorages==1.0.1 ujson==4.1.0 django-dbconn-retry==0.1.5 pydantic==1.9.1 +pyrabbit2==1.0.7 + # monitor django-prometheus==2.1.0