Skip to content

Commit

Permalink
optimization: 分页时展示 Agent 实时状态 (closed TencentBlueKing#752)
Browse files Browse the repository at this point in the history
  • Loading branch information
CohleRustW committed Jul 24, 2023
1 parent e56c3c6 commit fc040d3
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 13 deletions.
40 changes: 39 additions & 1 deletion apps/core/ipchooser/tools/host_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,45 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import typing

from apps.node_man.constants import QUERY_AGENT_STATUS_HOST_LENS
from apps.node_man.models import Host
from apps.node_man.periodic_tasks.sync_agent_status_task import (
update_or_create_host_agent_status,
)
from common.log import logger


class HostTool:
pass
@classmethod
def fill_agent_state_info_to_hosts(cls, host_infos: typing.List[typing.Dict[str, typing.Any]]):
"""
实时查询 Agent 状态,并填充到主机信息列表中
:param host_infos: 主机信息列表
:return:
"""
if len(host_infos) > QUERY_AGENT_STATUS_HOST_LENS:
return host_infos

bk_host_ids: typing.List[int] = [host_info["bk_host_id"] for host_info in host_infos]

try:
host_id__agent_state_info: typing.Dict[int, str] = update_or_create_host_agent_status(
task_id="[fill_agent_state_info_to_hosts]",
host_queryset=Host.objects.filter(bk_host_id__in=bk_host_ids),
)
except Exception as e:
# 获取主机状态信息失败,跳过填充步骤
logger.error(f"fill_agent_state_info_to_hosts error: {e}")
return host_infos

for host_info in host_infos:
try:
bk_host_id: int = host_info["bk_host_id"]
host_info["status"] = host_id__agent_state_info[bk_host_id]["status_display"]
host_info["version"] = host_id__agent_state_info[bk_host_id]["version"]
except KeyError:
pass

return host_infos
20 changes: 12 additions & 8 deletions apps/node_man/handlers/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from apps.core.ipchooser import core_ipchooser_constants
from apps.core.ipchooser.tools.base import HostQuerySqlHelper
from apps.core.ipchooser.tools.host_tool import HostTool
from apps.node_man import constants as const
from apps.node_man.constants import IamActionType
from apps.node_man.exceptions import (
Expand Down Expand Up @@ -165,12 +166,12 @@ def list(self, params: dict, username: str):
return {"total": len(value_list), "list": value_list}

# 分页结果的Host_id, cloud_id集合
bk_hosts_id = [hs["bk_host_id"] for hs in hosts_status]
bk_clouds_id = [hs["bk_cloud_id"] for hs in hosts_status]
bk_host_ids: List[int] = [hs["bk_host_id"] for hs in hosts_status]
bk_cloud_ids: List[int] = [hs["bk_cloud_id"] for hs in hosts_status]

# 获得管控区域名称
cloud_name = dict(
Cloud.objects.filter(bk_cloud_id__in=bk_clouds_id).values_list("bk_cloud_id", "bk_cloud_name")
Cloud.objects.filter(bk_cloud_id__in=bk_cloud_ids).values_list("bk_cloud_id", "bk_cloud_name")
)
cloud_name[0] = str(const.DEFAULT_CLOUD_NAME)

Expand All @@ -180,7 +181,7 @@ def list(self, params: dict, username: str):
# 如果需要job result数据
host_id_job_status = {}
if "job_result" in params.get("extra_data", []):
job_status = JobTask.objects.filter(bk_host_id__in=bk_hosts_id).values(
job_status = JobTask.objects.filter(bk_host_id__in=bk_host_ids).values(
"bk_host_id", "instance_id", "job_id", "status", "current_step"
)
host_id_job_status = {
Expand All @@ -197,7 +198,7 @@ def list(self, params: dict, username: str):
host_id_identities = {}
if "identity_info" in params.get("extra_data", []):
# 获得主机信息
identities = IdentityData.objects.filter(bk_host_id__in=bk_hosts_id).values(
identities = IdentityData.objects.filter(bk_host_id__in=bk_host_ids).values(
"bk_host_id", "account", "auth_type", "port", "password", "key"
)

Expand All @@ -217,9 +218,12 @@ def list(self, params: dict, username: str):
for identity in identities
}

# 实时查询主机状态
filled_host_status: List[Dict[str, Any]] = HostTool.fill_agent_state_info_to_hosts(host_infos=hosts_status)

# 获得{biz:[bk_host_id]}格式数据
biz_host_id_map = {}
for hs in hosts_status:
for hs in filled_host_status:
if hs["bk_biz_id"] not in biz_host_id_map:
biz_host_id_map[hs["bk_biz_id"]] = [hs["bk_host_id"]]
else:
Expand All @@ -229,7 +233,7 @@ def list(self, params: dict, username: str):
topology = CmdbHandler().cmdb_or_cache_topo(username, user_biz, biz_host_id_map)

# 汇总
for hs in hosts_status:
for hs in filled_host_status:
hs["status_display"] = const.PROC_STATUS_CHN.get(hs["status"], "")
hs["bk_cloud_name"] = cloud_name.get(hs["bk_cloud_id"])
hs["install_channel_name"] = install_name_dict.get(hs["install_channel_id"])
Expand All @@ -239,7 +243,7 @@ def list(self, params: dict, username: str):
hs["topology"] = topology.get(hs["bk_host_id"], [])
hs["operate_permission"] = hs["bk_biz_id"] in agent_operate_bizs

result = {"total": hosts_status_count, "list": hosts_status}
result = {"total": hosts_status_count, "list": filled_host_status}

return result

Expand Down
12 changes: 8 additions & 4 deletions apps/node_man/periodic_tasks/sync_agent_status_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def update_or_create_host_agent_status(task_id: int, host_queryset: QuerySet):
)

# 通过管控区域:内网形式对应bk_host_id&node_from
agent_id__host_id_map: typing.Dict[str, int] = {}
agent_id__host_id_map: typing.Dict[str, int] = defaultdict(dict)
agent_id__node_from_map: typing.Dict[str, str] = {}

# 生成查询参数host弄表
Expand Down Expand Up @@ -101,8 +101,7 @@ def update_or_create_host_agent_status(task_id: int, host_queryset: QuerySet):
to_be_updated_node_from_host_objs: typing.List[Host] = []
to_be_updated_process_status_objs: typing.List[ProcessStatus] = []
to_be_created_process_status_objs: typing.List[ProcessStatus] = []
# TODO 实时更新 Agent 状态 - 建立主机 ID - Agent 状态信息映射关系,并返回给上层使用方,用于填充 Agent 状态
# host_id__agent_state_info = {}
host_id__agent_state_info: typing.Dict[int, typing.Dict[str, int]] = {}
for agent_id, agent_state_info in agent_id__agent_state_info_map.items():
process_status_info: typing.Optional[typing.Dict[str, typing.Any]] = host_id__process_status_info_map.get(
agent_id__host_id_map[agent_id]
Expand All @@ -126,6 +125,9 @@ def update_or_create_host_agent_status(task_id: int, host_queryset: QuerySet):
# 主机来源于自身,标记为终止
status = constants.ProcStateType.TERMINATED

agent_state_info["status_display"] = status
host_id__agent_state_info[agent_id__host_id_map[agent_id]] = agent_state_info

if not process_status_info:
# 如果不存在 ProcessStatus 对象需要创建
to_be_created_process_status_objs.append(
Expand Down Expand Up @@ -157,13 +159,15 @@ def update_or_create_host_agent_status(task_id: int, host_queryset: QuerySet):
ProcessStatus.objects.bulk_create(to_be_created_process_status_objs, batch_size=1000)
logger.info(f"{task_id} | sync_agent_status_task: Created {len(to_be_created_process_status_objs)} records")
if to_be_delete_process_status_ids:
__, delete_row_count = ProcessStatus.objects.filter(id__in=to_be_delete_process_status_ids).delete()
_, delete_row_count = ProcessStatus.objects.filter(id__in=to_be_delete_process_status_ids).delete()
logger.info(f"{task_id} | sync_agent_status_task: Deleted {delete_row_count} duplicate records")
logger.info(
f"{task_id} | sync_agent_status_task: Complete agent status update, "
f"start Host ID -> {hosts[0]['bk_host_id']}, count -> {len(hosts)}"
)

return host_id__agent_state_info


@periodic_task(
queue="default",
Expand Down

0 comments on commit fc040d3

Please sign in to comment.