diff --git a/apps/mock_data/views_mkd/job.py b/apps/mock_data/views_mkd/job.py index a222080449..60f42a2fa5 100644 --- a/apps/mock_data/views_mkd/job.py +++ b/apps/mock_data/views_mkd/job.py @@ -39,6 +39,25 @@ ], "retention": 1, } +JOB_REINSTALL_REQUEST_PARAMS = { + "job_type": constants.JobType.REINSTALL_AGENT, + "hosts": [ + { + "bk_host_id": host.DEFAULT_HOST_ID, + "bk_cloud_id": constants.DEFAULT_CLOUD, + "ap_id": constants.DEFAULT_AP_ID, + "install_channel_id": None, + "bk_biz_id": utils.DEFAULT_BK_BIZ_ID, + "os_type": constants.OsType.LINUX, + "inner_ip": host.DEFAULT_IP, + "inner_ipv6": host.DEFAULT_IPV6, + "account": constants.LINUX_ACCOUNT, + "port": settings.BKAPP_DEFAULT_SSH_PORT, + "auth_type": constants.AuthType.PASSWORD, + "password": "password", + } + ], +} JOB_OPERATE_REQUEST_PARAMS = {"job_type": constants.JobType.REINSTALL_AGENT, "bk_host_id": [host.DEFAULT_HOST_ID]} diff --git a/apps/node_man/periodic_tasks/sync_cmdb_host.py b/apps/node_man/periodic_tasks/sync_cmdb_host.py index e35a549436..0f25654d97 100644 --- a/apps/node_man/periodic_tasks/sync_cmdb_host.py +++ b/apps/node_man/periodic_tasks/sync_cmdb_host.py @@ -8,6 +8,7 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +import ipaddress import math import typing @@ -15,6 +16,7 @@ from celery.task import periodic_task from django.conf import settings from django.db import transaction +from django.db.models import Q from apps.backend.celery import app from apps.backend.utils.redis import REDIS_INST @@ -374,7 +376,11 @@ def update_or_create_host_base(biz_id, ap_map_config, is_gse2_gray, task_id, cmd def sync_biz_incremental_hosts( - bk_biz_id: int, ap_map_config: SyncHostApMapConfig, expected_bk_host_ids: typing.Iterable[int], is_gse2_gray: bool + bk_biz_id: int, + ap_map_config: SyncHostApMapConfig, + expected_bk_host_ids: typing.Iterable[int], + is_gse2_gray: bool, + inner_ips: typing.List[str], ): """ 同步业务增量主机 @@ -382,6 +388,7 @@ def sync_biz_incremental_hosts( :param ap_map_config: :param expected_bk_host_ids: 期望得到的主机ID列表 :param is_gse2_gray: + :param inner_ips:内网IPv4/IPv6列表 :return: """ logger.info( @@ -389,13 +396,29 @@ def sync_biz_incremental_hosts( f"bk_biz_id -> {bk_biz_id}, expected_bk_host_ids -> {expected_bk_host_ids}" ) expected_bk_host_ids: typing.Set[int] = set(expected_bk_host_ids) - exists_host_ids: typing.Set[int] = set( - models.Host.objects.filter(bk_biz_id=bk_biz_id, bk_host_id__in=expected_bk_host_ids).values_list( - "bk_host_id", flat=True + common_query_conditions = Q(bk_biz_id=bk_biz_id) & Q(bk_host_id__in=expected_bk_host_ids) + if not inner_ips: + exists_host_ids: typing.Set[int] = set( + models.Host.objects.filter(common_query_conditions).values_list("bk_host_id", flat=True) + ) + else: + ipv4_list, ipv6_list = filter_ipv4_and_ipv6(inner_ips) + # 仅存在IPv4 + if ipv4_list and not ipv6_list: + query_conditions = common_query_conditions & Q(inner_ip__in=ipv4_list) + # 仅存在IPv6 + elif ipv6_list and not ipv4_list: + query_conditions = common_query_conditions & Q(inner_ipv6__in=ipv6_list) + # 两者都存在 + else: + query_conditions = common_query_conditions & (Q(inner_ip__in=ipv4_list) | Q(inner_ipv6__in=ipv6_list)) + + exists_host_ids: typing.Set[int] = set( + models.Host.objects.filter(query_conditions).values_list("bk_host_id", flat=True) ) - ) # 计算出对比本地主机缓存,增量的主机 ID incremental_host_ids: typing.List[int] = list(expected_bk_host_ids - exists_host_ids) + logger.info(f"need sync hosts: {incremental_host_ids}, length -> {len(incremental_host_ids)}") # 尝试获取增量主机信息 hosts: typing.List[typing.Dict] = query_biz_hosts(bk_biz_id=bk_biz_id, bk_host_ids=incremental_host_ids) # 更新本地缓存 @@ -408,10 +431,14 @@ def sync_biz_incremental_hosts( ) -def bulk_differential_sync_biz_hosts(expected_bk_host_ids_gby_bk_biz_id: typing.Dict[int, typing.Iterable[int]]): +def bulk_differential_sync_biz_hosts( + expected_bk_host_ids_gby_bk_biz_id: typing.Dict[int, typing.Iterable[int]], + inner_ips_gby_bk_biz_id: typing.Dict[int, typing.List[str]] = None, +): """ 并发同步增量主机 :param expected_bk_host_ids_gby_bk_biz_id: 按业务ID聚合主机ID列表 + :param inner_ips_gby_bk_biz_id: 按业务ID聚合主机内网IP列表 :return: """ params_list: typing.List[typing.Dict] = [] @@ -419,14 +446,16 @@ def bulk_differential_sync_biz_hosts(expected_bk_host_ids_gby_bk_biz_id: typing. gray_tools: GrayTools = GrayTools() # TODO 开始跳跃 for bk_biz_id, bk_host_ids in expected_bk_host_ids_gby_bk_biz_id.items(): - params_list.append( - { - "bk_biz_id": bk_biz_id, - "ap_map_config": ap_map_config, - "expected_bk_host_ids": bk_host_ids, - "is_gse2_gray": gray_tools.is_gse2_gray(bk_biz_id=bk_biz_id), - } - ) + params = { + "bk_biz_id": bk_biz_id, + "ap_map_config": ap_map_config, + "expected_bk_host_ids": bk_host_ids, + "is_gse2_gray": gray_tools.is_gse2_gray(bk_biz_id=bk_biz_id), + "inner_ips": None, + } + if inner_ips_gby_bk_biz_id: + params["inner_ips"] = inner_ips_gby_bk_biz_id.get(bk_biz_id) + params_list.append(params) batch_call(func=sync_biz_incremental_hosts, params_list=params_list) @@ -618,3 +647,25 @@ def clear_need_delete_host_ids_task(): """ task_id = clear_need_delete_host_ids_task.request.id clear_need_delete_host_ids(task_id) + + +def filter_ipv4_and_ipv6(ip_list): + """ + 过滤出列表中的IPv4、IPv6地址 + :param ip_list: 包含IP地址的列表 + :return: 包含IPv4、IPv6地址的列表 + """ + ipv4_list = [] + ipv6_list = [] + for ip in ip_list: + try: + # 尝试将字符串解析为IP地址对象 + ip_obj = ipaddress.ip_address(ip) + if isinstance(ip_obj, ipaddress.IPv4Address): + ipv4_list.append(ip) + if isinstance(ip_obj, ipaddress.IPv6Address): + ipv6_list.append(ip) + except ValueError: + # 如果解析失败,则跳过该IP地址 + continue + return ipv4_list, ipv6_list diff --git a/apps/node_man/serializers/job.py b/apps/node_man/serializers/job.py index dba4db497a..8d5fc28165 100644 --- a/apps/node_man/serializers/job.py +++ b/apps/node_man/serializers/job.py @@ -308,6 +308,7 @@ def validate(self, attrs): bk_biz_ids = set() expected_bk_host_ids_gby_bk_biz_id: typing.Dict[int, typing.List[int]] = defaultdict(list) + inner_ips_gby_bk_biz_id: typing.Dict[int, typing.List[str]] = defaultdict(list) cipher = tools.HostTools.get_asymmetric_cipher() fields_need_decrypt = ["password", "key"] # 密码解密 @@ -325,10 +326,17 @@ def validate(self, attrs): if "bk_biz_id" not in host: raise ValidationError(_("主机信息缺少业务ID(bk_biz_id)")) expected_bk_host_ids_gby_bk_biz_id[host["bk_biz_id"]].append(host["bk_host_id"]) + if host.get("inner_ip"): + inner_ips_gby_bk_biz_id[host["bk_biz_id"]].append(host["inner_ip"]) + elif host.get("inner_ipv6"): + inner_ips_gby_bk_biz_id[host["bk_biz_id"]].append(host["inner_ipv6"]) if attrs["op_type"] not in [constants.OpType.INSTALL, constants.OpType.REPLACE]: # 差量同步主机 - bulk_differential_sync_biz_hosts(expected_bk_host_ids_gby_bk_biz_id) + bulk_differential_sync_biz_hosts( + expected_bk_host_ids_gby_bk_biz_id=expected_bk_host_ids_gby_bk_biz_id, + inner_ips_gby_bk_biz_id=inner_ips_gby_bk_biz_id, + ) set_agent_setup_info_to_attrs(attrs) diff --git a/apps/node_man/tests/test_handlers/test_install_channel.py b/apps/node_man/tests/test_handlers/test_install_channel.py index 8906c430a8..0d0fc06c23 100644 --- a/apps/node_man/tests/test_handlers/test_install_channel.py +++ b/apps/node_man/tests/test_handlers/test_install_channel.py @@ -95,6 +95,6 @@ def test_install_channel_hidden(self): hidden=True, ) - self.assertEqual(len(self.client.get("/api/install_channel/")["data"]), 10) - self.assertEqual(len(self.client.get("/api/install_channel/", {"with_hidden": False})["data"]), 10) - self.assertEqual(len(self.client.get("/api/install_channel/", {"with_hidden": True})["data"]), 11) + self.assertEqual(len(self.client.get("/api/install_channel/")["data"]), 11) + self.assertEqual(len(self.client.get("/api/install_channel/", {"with_hidden": False})["data"]), 11) + self.assertEqual(len(self.client.get("/api/install_channel/", {"with_hidden": True})["data"]), 12) diff --git a/apps/node_man/tests/test_views/test_job_views.py b/apps/node_man/tests/test_views/test_job_views.py index 9ac8a08f41..f0aacfc247 100644 --- a/apps/node_man/tests/test_views/test_job_views.py +++ b/apps/node_man/tests/test_views/test_job_views.py @@ -12,6 +12,8 @@ import copy from unittest.mock import patch +from mock.mock import patch as mock_patch + from apps.exceptions import ValidationError from apps.mock_data import utils from apps.mock_data.common_unit import host @@ -94,3 +96,34 @@ def generate_install_job_request_params(): data["hosts"][0]["outer_ip"] = "" data["hosts"][0]["outer_ipv6"] = "" return data + + +class TestHostInfoNotUpdateCase(MockSuperUserMixin, CustomAPITestCase): + def setUp(self) -> None: + Host.objects.update_or_create( + defaults={ + "bk_cloud_id": constants.DEFAULT_CLOUD, + "node_type": constants.NodeType.AGENT, + "bk_biz_id": utils.DEFAULT_BK_BIZ_ID, + "inner_ip": host.DEFAULT_IP, + }, + bk_host_id=host.DEFAULT_HOST_ID, + ) + return super().setUp() + + @mock_patch("apps.node_man.handlers.job.JobHandler.create_subscription", Subscription.create_subscription) + def test_install(self): + data = copy.deepcopy(job.JOB_REINSTALL_REQUEST_PARAMS) + data["hosts"][0]["inner_ip"] = "127.0.0.2" + # 将CC返回为重装传入的IP后更新主机 + with mock_patch("apps.node_man.periodic_tasks.sync_cmdb_host.query_biz_hosts") as query_biz_hosts: + query_biz_hosts.return_value = [ + { + "bk_host_id": host.DEFAULT_HOST_ID, + "bk_host_innerip": "127.0.0.2", + "bk_cloud_id": constants.DEFAULT_CLOUD, + }, + ] + response = self.client.post(path="/api/job/install/", data=data) + # 成功创建安装任务;校验返回job_id + self.assertEqual(type(response["data"]["job_id"]), int)