Skip to content

Commit

Permalink
feature: 周期任务同步进程状态 (close TencentBlueKing#380)
Browse files Browse the repository at this point in the history
  • Loading branch information
iSecloud committed Dec 21, 2021
1 parent a53cc55 commit f161db7
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 96 deletions.
2 changes: 1 addition & 1 deletion apps/node_man/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ def get_choices(cls):
# 周期任务相关
QUERY_EXPIRED_INFO_LENS = 2000
QUERY_AGENT_STATUS_HOST_LENS = 500
QUERY_PROC_STATUS_HOST_LENS = 500
QUERY_PROC_STATUS_HOST_LENS = 2000
QUERY_CMDB_LIMIT = 500
QUERY_CMDB_MODULE_LIMIT = 500
QUERY_CLOUD_LIMIT = 200
Expand Down
209 changes: 114 additions & 95 deletions apps/node_man/periodic_tasks/sync_proc_status_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import time

from celery.task import periodic_task, task
from django.conf import settings

from apps.component.esbclient import client_v2
from apps.node_man import constants
from apps.node_man.models import GsePluginDesc, Host, ProcessStatus
from apps.node_man.models import Host, ProcessStatus
from apps.utils.periodic_task import calculate_countdown
from common.log import logger

Expand All @@ -23,99 +25,112 @@ def get_version(version_str):
return version.group() if version else ""


def query_proc_status(start=0, limit=constants.QUERY_PROC_STATUS_HOST_LENS):
kwargs = {"meta": {"namespace": "nodeman"}, "page": {"start": start, "limit": limit}}
def query_proc_status_test():
"""
上云接口测试
200: 0.3s-0.4s (0.41s 0.29s 0.33s)
500: 0.35s-0.5s (0.34s 0.42s 0.51s)
1000: 0.5s-0.6s (0.53s 0.56s 0.61s)
2000: 0.9s (0.91s, 0.93s)
5000: 2s-4s (2.3s 2.2s 4.1s 4.3s)
"""
past = time.time()
proc_name = "basereport"
hosts = Host.objects.all()[0 : constants.QUERY_PROC_STATUS_HOST_LENS]
host_ip_cloud_list = [{"ip": host.inner_ip, "bk_cloud_id": host.bk_cloud_id} for host in hosts]
result = query_proc_status(proc_name, host_ip_cloud_list)

data = client_v2.gse.sync_proc_status(kwargs)
return data.get("count") or 0, data.get("proc_infos") or []
return result, time.time() - past


def query_proc_status(proc_name, host_list):
kwargs = {
"meta": {"namespace": "nodeman", "name": proc_name, "labels": {"proc_name": proc_name}},
"hosts": host_list,
}
data = client_v2.gse.get_proc_status(kwargs)

return data.get("proc_infos") or []


@task(queue="default", ignore_result=True)
def update_or_create_proc_status(task_id, sync_proc_list, start):
logger.info(f"{task_id} | Sync process status start flag: {start}")
_, proc_infos = query_proc_status(start)

bk_host_ips = []
bk_cloud_ids = []
proc_name_list = []
host_proc_status_map = {}

for info in proc_infos:
if info["meta"]["name"] not in sync_proc_list:
continue
if info["meta"]["name"] not in proc_name_list:
proc_name_list.append(info["meta"]["name"])
bk_host_ips.append(info["host"]["ip"])
bk_cloud_ids.append(info["host"]["bk_cloud_id"])
host_proc_status_map[f'{info["host"]["ip"]}:{info["host"]["bk_cloud_id"]}'] = {
"version": get_version(info["version"]),
"status": constants.PLUGIN_STATUS_DICT[info["status"]],
"is_auto": constants.AutoStateType.AUTO if info["isauto"] else constants.AutoStateType.UNAUTO,
"name": info["meta"]["name"],
}

# 查询已存在的主机
hosts = Host.objects.filter(inner_ip__in=bk_host_ips, bk_cloud_id__in=bk_cloud_ids).values(
"bk_host_id", "inner_ip", "bk_cloud_id"
)
def update_or_create_proc_status(task_id, hosts, sync_proc_list, start):
host_ip_cloud_list = []
bk_host_id_map = {}
for host in hosts:
bk_host_id_map[f"{host['inner_ip']}:{host['bk_cloud_id']}"] = host["bk_host_id"]

process_status_objs = ProcessStatus.objects.filter(
name__in=proc_name_list,
bk_host_id__in=bk_host_id_map.values(),
source_type=ProcessStatus.SourceType.DEFAULT,
proc_type=constants.ProcType.PLUGIN,
is_latest=True,
).values("bk_host_id", "id", "name", "status")

host_proc_key__proc_map = {}
for item in process_status_objs:
host_proc_key__proc_map[f"{item['name']}:{item['bk_host_id']}"] = item

need_update_status = []
need_create_status = []

for host_cloud_key, host_proc_info in host_proc_status_map.items():
if host_cloud_key not in bk_host_id_map:
continue
db_proc_info = host_proc_key__proc_map.get(f'{host_proc_info["name"]}:{bk_host_id_map[host_cloud_key]}')

# 如果DB中进程状态为手动停止,并且同步回来的进程状态为终止,此时保持手动停止的标记,用于订阅的豁免操作
if (
db_proc_info
and db_proc_info["status"] == constants.ProcStateType.MANUAL_STOP
and host_proc_info["status"] == constants.ProcStateType.TERMINATED
):
host_proc_info["status"] = db_proc_info["status"]

if db_proc_info:
# need update
obj = ProcessStatus(
pk=db_proc_info["id"],
status=host_proc_info["status"],
version=host_proc_info["version"],
is_auto=host_proc_info["is_auto"],
)
need_update_status.append(obj)
else:
# need create
obj = ProcessStatus(
status=host_proc_info["status"],
version=host_proc_info["version"],
is_auto=host_proc_info["is_auto"],
name=host_proc_info["name"],
source_type=ProcessStatus.SourceType.DEFAULT,
proc_type=constants.ProcType.PLUGIN,
bk_host_id=bk_host_id_map[host_cloud_key],
is_latest=True,
)
need_create_status.append(obj)

ProcessStatus.objects.bulk_update(need_update_status, fields=["status", "version", "is_auto"])
ProcessStatus.objects.bulk_create(need_create_status)
logger.info(f"{task_id} | Sync process status start flag: {start} complate")
for host in hosts[start : start + constants.QUERY_PROC_STATUS_HOST_LENS]:
host_ip_cloud_list.append({"ip": host.inner_ip, "bk_cloud_id": host.bk_cloud_id})
bk_host_id_map[f"{host.inner_ip}:{host.bk_cloud_id}"] = host.bk_host_id

for proc_name in sync_proc_list:
past = time.time()
proc_infos = query_proc_status(proc_name, host_ip_cloud_list)
logger.info(f"this get_proc_status cost {time.time() - past}s")

host_proc_status_map = {}
for info in proc_infos:
host_proc_status_map[f'{info["host"]["ip"]}:{info["host"]["bk_cloud_id"]}'] = {
"version": get_version(info["version"]),
"status": constants.PLUGIN_STATUS_DICT[info["status"]],
"is_auto": constants.AutoStateType.AUTO if info["isauto"] else constants.AutoStateType.UNAUTO,
"name": info["meta"]["name"],
}

process_status_objs = ProcessStatus.objects.filter(
name=proc_name,
bk_host_id__in=bk_host_id_map.values(),
source_type=ProcessStatus.SourceType.DEFAULT,
proc_type=constants.ProcType.PLUGIN,
is_latest=True,
).values("bk_host_id", "id", "name", "status")

host_proc_key__proc_map = {}
for item in process_status_objs:
host_proc_key__proc_map[f"{item['name']}:{item['bk_host_id']}"] = item

need_update_status = []
need_create_status = []

for host_cloud_key, host_proc_info in host_proc_status_map.items():
if host_cloud_key not in bk_host_id_map:
continue
db_proc_info = host_proc_key__proc_map.get(f'{host_proc_info["name"]}:{bk_host_id_map[host_cloud_key]}')

# 如果DB中进程状态为手动停止,并且同步回来的进程状态为终止,此时保持手动停止的标记,用于订阅的豁免操作
if (
db_proc_info
and db_proc_info["status"] == constants.ProcStateType.MANUAL_STOP
and host_proc_info["status"] == constants.ProcStateType.TERMINATED
):
host_proc_info["status"] = db_proc_info["status"]

if db_proc_info:
# need update
obj = ProcessStatus(
pk=db_proc_info["id"],
status=host_proc_info["status"],
version=host_proc_info["version"],
is_auto=host_proc_info["is_auto"],
)
need_update_status.append(obj)
else:
# need create
obj = ProcessStatus(
status=host_proc_info["status"],
version=host_proc_info["version"],
is_auto=host_proc_info["is_auto"],
name=host_proc_info["name"],
source_type=ProcessStatus.SourceType.DEFAULT,
proc_type=constants.ProcType.PLUGIN,
bk_host_id=bk_host_id_map[host_cloud_key],
is_latest=True,
)
# 忽略无用的进程信息
if obj.status != "UNREGISTER":
need_create_status.append(obj)

ProcessStatus.objects.bulk_update(need_update_status, fields=["status", "version", "is_auto"])
ProcessStatus.objects.bulk_create(need_create_status)
logger.info(f"{task_id} | Sync process status start flag: {start} complate")


@periodic_task(
Expand All @@ -124,18 +139,22 @@ def update_or_create_proc_status(task_id, sync_proc_list, start):
run_every=constants.SYNC_PROC_STATUS_TASK_INTERVAL,
)
def sync_proc_status_task():
sync_proc_list = GsePluginDesc.objects.filter(category=constants.CategoryType.official).values_list(
"name", flat=True
)

sync_proc_list = settings.HEAD_PLUGINS
task_id = sync_proc_status_task.request.id
count, _ = query_proc_status(limit=1)
logger.info(f"{task_id} | sync host proc status count={count}.")
hosts = Host.objects.all()
count = hosts.count()
logger.info(f"{task_id} | sync host proc status... host_count={count}.")

for start in range(0, count, constants.QUERY_PROC_STATUS_HOST_LENS):
countdown = calculate_countdown(
count=count / constants.QUERY_PROC_STATUS_HOST_LENS,
index=start / constants.QUERY_PROC_STATUS_HOST_LENS,
duration=constants.SYNC_PROC_STATUS_TASK_INTERVAL,
)
logger.info(f"{task_id} | sync host proc status after {countdown} seconds")
update_or_create_proc_status.apply_async((task_id, sync_proc_list, start), countdown=countdown)

# update_or_create_proc_status(task_id, hosts, sync_proc_list, start)
update_or_create_proc_status.apply_async((task_id, hosts, sync_proc_list, start), countdown=countdown)

logger.info(f"{task_id} | sync host proc status complate.")
3 changes: 3 additions & 0 deletions dev_log/2.1.362/kiozhang_202112211506.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
---
feature:
- "周期任务同步进程状态 (close #380)"

0 comments on commit f161db7

Please sign in to comment.