diff --git a/dbm-ui/backend/configuration/handlers/password.py b/dbm-ui/backend/configuration/handlers/password.py index 3a117c15ec..d10486855b 100644 --- a/dbm-ui/backend/configuration/handlers/password.py +++ b/dbm-ui/backend/configuration/handlers/password.py @@ -16,12 +16,7 @@ from django_celery_beat.schedulers import ModelEntry from backend.components import DBPrivManagerApi -from backend.configuration.constants import ( - DB_ADMIN_USER_MAP, - DBM_PASSWORD_SECURITY_NAME, - MYSQL_ADMIN_USER, - AdminPasswordRole, -) +from backend.configuration.constants import DB_ADMIN_USER_MAP, DBM_PASSWORD_SECURITY_NAME, AdminPasswordRole from backend.configuration.exceptions import PasswordPolicyBaseException from backend.core.encrypt.constants import AsymmetricCipherConfigType from backend.core.encrypt.handlers import AsymmetricHandler @@ -75,7 +70,7 @@ def verify_password_strength( return check_result @classmethod - def query_mysql_admin_password( + def query_admin_password( cls, limit: int, offset: int, @@ -83,6 +78,7 @@ def query_mysql_admin_password( instances: List[str] = None, begin_time: str = None, end_time: str = None, + db_type: str = None, ): """ 获取mysql的admin密码 @@ -96,6 +92,11 @@ def query_mysql_admin_password( instances = instances or [] # 获取过滤条件 instance_list = [] + + # 判断db类型是否在映射字典中 + if db_type not in DB_ADMIN_USER_MAP: + raise PasswordPolicyBaseException(_("目前暂未支持{}类型的查询").format(db_type)) + for address in instances: split_len = len(address.split(":")) if split_len == 1: @@ -110,8 +111,7 @@ def query_mysql_admin_password( instance_list.append({"ip": ip, "port": int(port), "bk_cloud_id": bk_cloud_id}) else: raise PasswordPolicyBaseException(_("请保证查询的实例输入格式合法,格式为[CLOUD_ID:]IP:PORT")) - - filters = {"limit": limit, "offset": offset, "username": MYSQL_ADMIN_USER} + filters = {"limit": limit, "offset": offset, "username": DB_ADMIN_USER_MAP[db_type]} if instance_list: filters.update(instances=instance_list) if begin_time: @@ -122,14 +122,14 @@ def query_mysql_admin_password( filters.update(bk_biz_id=bk_biz_id) # 获取密码生效实例结果 - mysql_admin_password_data = DBPrivManagerApi.get_mysql_admin_password(params=filters) - mysql_admin_password_data["results"] = mysql_admin_password_data.pop("items") + admin_password_data = DBPrivManagerApi.get_mysql_admin_password(params=filters) + admin_password_data["results"] = admin_password_data.pop("items") cloud_info = ResourceQueryHelper.search_cc_cloud(get_cache=True) - for data in mysql_admin_password_data["results"]: + for data in admin_password_data["results"]: data["password"] = base64_decode(data["password"]) data["bk_cloud_name"] = cloud_info[str(data["bk_cloud_id"])]["bk_cloud_name"] - return mysql_admin_password_data + return admin_password_data @classmethod def modify_admin_password(cls, operator: str, password: str, lock_hour: int, instance_list: List[Dict]): diff --git a/dbm-ui/backend/configuration/serializers.py b/dbm-ui/backend/configuration/serializers.py index e8792e5045..76ed47e7ca 100644 --- a/dbm-ui/backend/configuration/serializers.py +++ b/dbm-ui/backend/configuration/serializers.py @@ -89,7 +89,7 @@ class CrontabSerializer(serializers.Serializer): crontab = CrontabSerializer(help_text=_("crontab表达式")) -class GetMySQLAdminPasswordSerializer(serializers.Serializer): +class GetAdminPasswordSerializer(serializers.Serializer): limit = serializers.IntegerField(help_text=_("分页限制"), required=False, default=10) offset = serializers.IntegerField(help_text=_("分页起始"), required=False, default=0) @@ -97,6 +97,7 @@ class GetMySQLAdminPasswordSerializer(serializers.Serializer): begin_time = DBTimezoneField(help_text=_("开始时间"), required=False) end_time = DBTimezoneField(help_text=_("结束时间"), required=False) instances = serializers.CharField(help_text=_("过滤的实例列表(通过,分割,实例格式为--cloud:ip:port)"), required=False) + db_type = serializers.ChoiceField(help_text=_("db类型"), choices=DBType.get_choices(), required=False, default="") class GetMySQLAdminPasswordResponseSerializer(serializers.Serializer): diff --git a/dbm-ui/backend/configuration/views/password_policy.py b/dbm-ui/backend/configuration/views/password_policy.py index 514b021c52..58302621d1 100644 --- a/dbm-ui/backend/configuration/views/password_policy.py +++ b/dbm-ui/backend/configuration/views/password_policy.py @@ -22,8 +22,8 @@ from backend.configuration.constants import DBM_PASSWORD_SECURITY_NAME from backend.configuration.handlers.password import DBPasswordHandler from backend.configuration.serializers import ( + GetAdminPasswordSerializer, GetMySQLAdminPasswordResponseSerializer, - GetMySQLAdminPasswordSerializer, GetRandomPasswordSerializer, ModifyAdminPasswordSerializer, ModifyMySQLPasswordRandomCycleSerializer, @@ -128,17 +128,17 @@ def query_random_cycle(self, request, *args, **kwargs): return Response({"crontab": crontab_exec}) @common_swagger_auto_schema( - operation_summary=_("查询mysql生效实例密码(admin)"), - request_body=GetMySQLAdminPasswordSerializer(), + operation_summary=_("查询生效实例密码(admin)"), + request_body=GetAdminPasswordSerializer(), responses={status.HTTP_200_OK: GetMySQLAdminPasswordResponseSerializer()}, tags=[SWAGGER_TAG], ) - @action(methods=["POST"], detail=False, serializer_class=GetMySQLAdminPasswordSerializer, pagination_class=None) - def query_mysql_admin_password(self, request, *args, **kwargs): + @action(methods=["POST"], detail=False, serializer_class=GetAdminPasswordSerializer, pagination_class=None) + def query_admin_password(self, request, *args, **kwargs): validated_data = self.params_validate(self.get_serializer_class()) if validated_data.get("instances"): validated_data["instances"] = validated_data["instances"].split(",") - return Response(DBPasswordHandler.query_mysql_admin_password(**validated_data)) + return Response(DBPasswordHandler.query_admin_password(**validated_data)) @common_swagger_auto_schema( operation_summary=_("修改db实例密码(admin)"),