Skip to content

Commit

Permalink
feature: 周期任务同步进程状态 (close #380)
Browse files Browse the repository at this point in the history
  • Loading branch information
iSecloud authored and zhangzhw8 committed Dec 24, 2021
1 parent a3bb8f5 commit 4bf6c4d
Show file tree
Hide file tree
Showing 10 changed files with 258 additions and 101 deletions.
15 changes: 14 additions & 1 deletion apps/backend/tests/components/collections/gse/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

GSE_CLIENT_MOCK_PATH = "apps.backend.api.gse.get_client_by_user"


GSE_TASK_RESULT = {
"success": [
{
Expand Down Expand Up @@ -80,6 +79,18 @@
},
}

MOCK_GET_PROC_STATUS = {
"proc_infos": [
{
"host": {"ip": "127.0.0.1", "bk_cloud_id": 0},
"status": 1,
"version": "1.1.13",
"isauto": True,
"meta": {"name": "test_proc", "namespace": "nodeman", "labels": {"proc_name": "test_proc"}},
}
]
}


class GseMockClient(object):
def __init__(
Expand All @@ -88,9 +99,11 @@ def __init__(
unregister_proc_info_return=None,
operate_proc_return=None,
get_proc_operate_result_return=None,
get_proc_status_return=None,
):
self.gse = MagicMock()
self.gse.update_proc_info = MagicMock(return_value=update_proc_info_return)
self.gse.unregister_proc_info = MagicMock(return_value=unregister_proc_info_return)
self.gse.operate_proc = MagicMock(return_value=operate_proc_return)
self.gse.get_proc_operate_result = MagicMock(return_value=get_proc_operate_result_return)
self.gse.get_proc_status = MagicMock(return_value=get_proc_status_return)
2 changes: 1 addition & 1 deletion apps/node_man/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ def get_choices(cls):
# 周期任务相关
QUERY_EXPIRED_INFO_LENS = 2000
QUERY_AGENT_STATUS_HOST_LENS = 500
QUERY_PROC_STATUS_HOST_LENS = 500
QUERY_PROC_STATUS_HOST_LENS = 2000
QUERY_CMDB_LIMIT = 500
QUERY_CMDB_MODULE_LIMIT = 500
QUERY_CLOUD_LIMIT = 200
Expand Down
4 changes: 2 additions & 2 deletions apps/node_man/management/commands/sync_plugin_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@

from django.core.management.base import BaseCommand

from apps.node_man.periodic_tasks import sync_proc_status_task
from apps.node_man.periodic_tasks import sync_proc_status_periodic_task


class Command(BaseCommand):
def handle(self, **kwargs):
sync_proc_status_task()
sync_proc_status_periodic_task()
2 changes: 1 addition & 1 deletion apps/node_man/periodic_tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from .sync_agent_status_task import sync_agent_status_task # noqa
from .sync_cmdb_cloud_area import sync_cmdb_cloud_area # noqa
from .sync_cmdb_host import sync_cmdb_host # noqa
from .sync_proc_status_task import sync_proc_status_task # noqa
from .sync_proc_status_task import sync_proc_status_periodic_task # noqa
from .update_proxy_file import update_proxy_files # noqa

# 是否启用 gse svr 服务发现,启用后,默认接入点会通过zk的方式,自动更新gse svr信息
Expand Down
203 changes: 107 additions & 96 deletions apps/node_man/periodic_tasks/sync_proc_status_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import time

from celery.task import periodic_task, task
from django.conf import settings

from apps.component.esbclient import client_v2
from apps.node_man import constants
from apps.node_man.models import GsePluginDesc, Host, ProcessStatus
from apps.node_man.models import Host, ProcessStatus
from apps.utils.periodic_task import calculate_countdown
from common.log import logger

Expand All @@ -23,119 +25,128 @@ def get_version(version_str):
return version.group() if version else ""


def query_proc_status(start=0, limit=constants.QUERY_PROC_STATUS_HOST_LENS):
kwargs = {"meta": {"namespace": "nodeman"}, "page": {"start": start, "limit": limit}}
def query_proc_status(proc_name, host_list):
"""
上云接口测试
200: 0.3s-0.4s (0.41s 0.29s 0.33s)
500: 0.35s-0.5s (0.34s 0.42s 0.51s)
1000: 0.5s-0.6s (0.53s 0.56s 0.61s)
2000: 0.9s (0.91s, 0.93s)
5000: 2s-4s (2.3s 2.2s 4.1s 4.3s)
"""
kwargs = {
"meta": {"namespace": "nodeman", "name": proc_name, "labels": {"proc_name": proc_name}},
"hosts": host_list,
}
data = client_v2.gse.get_proc_status(kwargs)

data = client_v2.gse.sync_proc_status(kwargs)
return data.get("count") or 0, data.get("proc_infos") or []
return data.get("proc_infos") or []


@task(queue="default", ignore_result=True)
def update_or_create_proc_status(task_id, sync_proc_list, start):
logger.info(f"{task_id} | Sync process status start flag: {start}")
_, proc_infos = query_proc_status(start)

bk_host_ips = []
bk_cloud_ids = []
proc_name_list = []
host_proc_status_map = {}

for info in proc_infos:
if info["meta"]["name"] not in sync_proc_list:
continue
if info["meta"]["name"] not in proc_name_list:
proc_name_list.append(info["meta"]["name"])
bk_host_ips.append(info["host"]["ip"])
bk_cloud_ids.append(info["host"]["bk_cloud_id"])
host_proc_status_map[f'{info["host"]["ip"]}:{info["host"]["bk_cloud_id"]}'] = {
"version": get_version(info["version"]),
"status": constants.PLUGIN_STATUS_DICT[info["status"]],
"is_auto": constants.AutoStateType.AUTO if info["isauto"] else constants.AutoStateType.UNAUTO,
"name": info["meta"]["name"],
}

# 查询已存在的主机
hosts = Host.objects.filter(inner_ip__in=bk_host_ips, bk_cloud_id__in=bk_cloud_ids).values(
"bk_host_id", "inner_ip", "bk_cloud_id"
)
def update_or_create_proc_status(task_id, hosts, sync_proc_list, start):
host_ip_cloud_list = []
bk_host_id_map = {}
for host in hosts:
bk_host_id_map[f"{host['inner_ip']}:{host['bk_cloud_id']}"] = host["bk_host_id"]

process_status_objs = ProcessStatus.objects.filter(
name__in=proc_name_list,
bk_host_id__in=bk_host_id_map.values(),
source_type=ProcessStatus.SourceType.DEFAULT,
proc_type=constants.ProcType.PLUGIN,
is_latest=True,
).values("bk_host_id", "id", "name", "status")

host_proc_key__proc_map = {}
for item in process_status_objs:
host_proc_key__proc_map[f"{item['name']}:{item['bk_host_id']}"] = item

need_update_status = []
need_create_status = []

for host_cloud_key, host_proc_info in host_proc_status_map.items():
if host_cloud_key not in bk_host_id_map:
continue
db_proc_info = host_proc_key__proc_map.get(f'{host_proc_info["name"]}:{bk_host_id_map[host_cloud_key]}')

# 如果DB中进程状态为手动停止,并且同步回来的进程状态为终止,此时保持手动停止的标记,用于订阅的豁免操作
if (
db_proc_info
and db_proc_info["status"] == constants.ProcStateType.MANUAL_STOP
and host_proc_info["status"] == constants.ProcStateType.TERMINATED
):
host_proc_info["status"] = db_proc_info["status"]

if db_proc_info:
# need update
obj = ProcessStatus(
pk=db_proc_info["id"],
status=host_proc_info["status"],
version=host_proc_info["version"],
is_auto=host_proc_info["is_auto"],
)
need_update_status.append(obj)
else:
# need create
obj = ProcessStatus(
status=host_proc_info["status"],
version=host_proc_info["version"],
is_auto=host_proc_info["is_auto"],
name=host_proc_info["name"],
source_type=ProcessStatus.SourceType.DEFAULT,
proc_type=constants.ProcType.PLUGIN,
bk_host_id=bk_host_id_map[host_cloud_key],
is_latest=True,
)
need_create_status.append(obj)

ProcessStatus.objects.bulk_update(need_update_status, fields=["status", "version", "is_auto"])
ProcessStatus.objects.bulk_create(need_create_status)
logger.info(f"{task_id} | Sync process status start flag: {start} complate")
host_ip_cloud_list.append({"ip": host.inner_ip, "bk_cloud_id": host.bk_cloud_id})
bk_host_id_map[f"{host.inner_ip}:{host.bk_cloud_id}"] = host.bk_host_id

for proc_name in sync_proc_list:
past = time.time()
proc_infos = query_proc_status(proc_name, host_ip_cloud_list)
logger.info(f"this get_proc_status cost {time.time() - past}s")

host_proc_status_map = {}
for info in proc_infos:
host_proc_status_map[f'{info["host"]["ip"]}:{info["host"]["bk_cloud_id"]}'] = {
"version": get_version(info["version"]),
"status": constants.PLUGIN_STATUS_DICT[info["status"]],
"is_auto": constants.AutoStateType.AUTO if info["isauto"] else constants.AutoStateType.UNAUTO,
"name": info["meta"]["name"],
}

process_status_objs = ProcessStatus.objects.filter(
name=proc_name,
bk_host_id__in=bk_host_id_map.values(),
source_type=ProcessStatus.SourceType.DEFAULT,
proc_type=constants.ProcType.PLUGIN,
is_latest=True,
).values("bk_host_id", "id", "name", "status")

host_proc_key__proc_map = {}
for item in process_status_objs:
host_proc_key__proc_map[f"{item['name']}:{item['bk_host_id']}"] = item

need_update_status = []
need_create_status = []

for host_cloud_key, host_proc_info in host_proc_status_map.items():
if host_cloud_key not in bk_host_id_map:
continue
db_proc_info = host_proc_key__proc_map.get(f'{host_proc_info["name"]}:{bk_host_id_map[host_cloud_key]}')

# 如果DB中进程状态为手动停止,并且同步回来的进程状态为终止,此时保持手动停止的标记,用于订阅的豁免操作
if (
db_proc_info
and db_proc_info["status"] == constants.ProcStateType.MANUAL_STOP
and host_proc_info["status"] == constants.ProcStateType.TERMINATED
):
host_proc_info["status"] = db_proc_info["status"]

if db_proc_info:
# need update
obj = ProcessStatus(
pk=db_proc_info["id"],
status=host_proc_info["status"],
version=host_proc_info["version"],
is_auto=host_proc_info["is_auto"],
)
need_update_status.append(obj)
else:
# need create
obj = ProcessStatus(
status=host_proc_info["status"],
version=host_proc_info["version"],
is_auto=host_proc_info["is_auto"],
name=host_proc_info["name"],
source_type=ProcessStatus.SourceType.DEFAULT,
proc_type=constants.ProcType.PLUGIN,
bk_host_id=bk_host_id_map[host_cloud_key],
is_latest=True,
)
# 忽略无用的进程信息
if obj.status != "UNREGISTER":
need_create_status.append(obj)

ProcessStatus.objects.bulk_update(need_update_status, fields=["status", "version", "is_auto"])
ProcessStatus.objects.bulk_create(need_create_status)
logger.info(f"{task_id} | Sync process status start flag: {start} complate")


@periodic_task(
queue="default",
options={"queue": "default"},
run_every=constants.SYNC_PROC_STATUS_TASK_INTERVAL,
)
def sync_proc_status_task():
sync_proc_list = GsePluginDesc.objects.filter(category=constants.CategoryType.official).values_list(
"name", flat=True
)
task_id = sync_proc_status_task.request.id
count, _ = query_proc_status(limit=1)
logger.info(f"{task_id} | sync host proc status count={count}.")
def sync_proc_status_periodic_task():
sync_proc_list = settings.HEAD_PLUGINS
task_id = sync_proc_status_periodic_task.request.id
hosts = Host.objects.all()
count = hosts.count()
logger.info(f"{task_id} | sync host proc status... host_count={count}.")

for start in range(0, count, constants.QUERY_PROC_STATUS_HOST_LENS):
countdown = calculate_countdown(
count=count / constants.QUERY_PROC_STATUS_HOST_LENS,
index=start / constants.QUERY_PROC_STATUS_HOST_LENS,
duration=constants.SYNC_PROC_STATUS_TASK_INTERVAL,
)
logger.info(f"{task_id} | sync host proc status after {countdown} seconds")
update_or_create_proc_status.apply_async((task_id, sync_proc_list, start), countdown=countdown)

# (task_id, hosts[start: start + constants.QUERY_PROC_STATUS_HOST_LENS], sync_proc_list, start)
update_or_create_proc_status.apply_async(
(task_id, hosts[start : start + constants.QUERY_PROC_STATUS_HOST_LENS], sync_proc_list, start),
countdown=countdown,
)

logger.info(f"{task_id} | sync host proc status complate.")
10 changes: 10 additions & 0 deletions apps/node_man/tests/test_pericdic_tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available.
Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
40 changes: 40 additions & 0 deletions apps/node_man/tests/test_pericdic_tasks/mock_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available.
Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

from apps.node_man import constants
from apps.node_man.models import ProcessStatus

MOCK_PROC_NAME = "test_proc"
MOCK_HOST = {
"bk_biz_id": 0,
"bk_host_id": 2147483640,
"inner_ip": "127.0.0.1",
"bk_cloud_id": 0,
}
MOCK_PROC_STATUS = {
"status": constants.ProcStateType.TERMINATED,
"is_latest": True,
"bk_host_id": MOCK_HOST["bk_host_id"],
"name": MOCK_PROC_NAME,
"source_type": ProcessStatus.SourceType.DEFAULT,
"proc_type": constants.ProcType.PLUGIN,
}
MOCK_GET_PROC_STATUS = {
"proc_infos": [
{
"host": {"ip": MOCK_HOST["inner_ip"], "bk_cloud_id": MOCK_HOST["bk_cloud_id"]},
"status": 1,
"version": "1.1.13",
"isauto": True,
"meta": {"name": MOCK_PROC_NAME, "namespace": "nodeman", "labels": {"proc_name": MOCK_PROC_NAME}},
}
]
}
Loading

0 comments on commit 4bf6c4d

Please sign in to comment.