diff --git a/common/auth.py b/common/auth.py index 31b1ad67e4..9c770e9329 100644 --- a/common/auth.py +++ b/common/auth.py @@ -75,7 +75,11 @@ def authenticate(self): init_user(authenticated_user) return {"status": 0, "msg": "ok", "data": authenticated_user} else: - return {"status": 1, "msg": "用户名或密码错误,请重新输入!", "data": ""} + return { + "status": 1, + "msg": "用户名或密码错误,请重新输入!", + "data": "", + } except: logger.error("验证用户密码时报错") logger.error(traceback.format_exc()) diff --git a/common/tests.py b/common/tests.py index a4345b76d2..9ad46d16b2 100644 --- a/common/tests.py +++ b/common/tests.py @@ -539,7 +539,10 @@ def test_init_user(self): class PermissionTest(TestCase): def setUp(self) -> None: self.user = User.objects.create( - username="test_user", display="中文显示", is_active=True, email="XXX@xxx.com" + username="test_user", + display="中文显示", + is_active=True, + email="XXX@xxx.com", ) self.client.force_login(self.user) diff --git a/common/twofa/sms.py b/common/twofa/sms.py index 7e69f05415..e821a50e06 100644 --- a/common/twofa/sms.py +++ b/common/twofa/sms.py @@ -45,9 +45,9 @@ def get_captcha(self, **kwargs): result = {"status": 1, "msg": "系统未配置短信服务商!"} else: result["status"] = 1 - result[ - "msg" - ] = f"获取验证码太频繁,请于{captcha['update_time'] - int(time.time()) + 60}秒后再试" + result["msg"] = ( + f"获取验证码太频繁,请于{captcha['update_time'] - int(time.time()) + 60}秒后再试" + ) else: if self.client: result = self.client.send_code(**kwargs) diff --git a/common/utils/permission.py b/common/utils/permission.py index 43eb9f3218..3d6a33433a 100644 --- a/common/utils/permission.py +++ b/common/utils/permission.py @@ -33,7 +33,11 @@ def wrapper(request, *args, **kw): if user.role not in roles and user.is_superuser is False: is_ajax = request.META.get("HTTP_X_REQUESTED_WITH") == "XMLHttpRequest" if is_ajax: - result = {"status": 1, "msg": "您无权操作,请联系管理员", "data": []} + result = { + "status": 1, + "msg": "您无权操作,请联系管理员", + "data": [], + } return HttpResponse( json.dumps(result), content_type="application/json" ) diff --git a/common/utils/sendmsg.py b/common/utils/sendmsg.py index 8e2903e0af..f6713431f2 100755 --- a/common/utils/sendmsg.py +++ b/common/utils/sendmsg.py @@ -126,7 +126,9 @@ def send_email(self, subject, body, to, **kwargs): self.MAIL_REVIEW_FROM_ADDR, to + list_cc, main_msg.as_string() ) server.quit() - logger.debug(f"邮件推送成功\n消息标题:{subject}\n通知对象:{to + list_cc}\n消息内容:{body}") + logger.debug( + f"邮件推送成功\n消息标题:{subject}\n通知对象:{to + list_cc}\n消息内容:{body}" + ) return "success" except Exception: errmsg = "邮件推送失败\n{}".format(traceback.format_exc()) @@ -150,7 +152,9 @@ def send_ding(url, content): if r_json["errcode"] == 0: logger.debug(f"钉钉Webhook推送成功\n通知对象:{url}\n消息内容:{content}") else: - logger.error(f"钉钉Webhook推送失败错误码\n请求url:{url}\n请求data:{data}\n请求响应:{r_json}") + logger.error( + f"钉钉Webhook推送失败错误码\n请求url:{url}\n请求data:{data}\n请求响应:{r_json}" + ) def send_ding2user(self, userid_list, content): """ @@ -171,7 +175,9 @@ def send_ding2user(self, userid_list, content): if r_json["errcode"] == 0: logger.debug(f"钉钉推送成功\n通知对象:{userid_list}\n消息内容:{content}") else: - logger.error(f"钉钉推送失败\n请求连接:{send_url}\n请求参数:{data}\n请求响应:{r_json}") + logger.error( + f"钉钉推送失败\n请求连接:{send_url}\n请求参数:{data}\n请求响应:{r_json}" + ) def send_wx2user(self, msg, user_list): if not user_list: @@ -191,7 +197,9 @@ def send_wx2user(self, msg, user_list): if r_json["errcode"] == 0: logger.debug(f"企业微信推送成功\n通知对象:{to_user}") else: - logger.error(f"企业微信推送失败\n请求连接:{send_url}\n请求参数:{data}\n请求响应:{r_json}") + logger.error( + f"企业微信推送失败\n请求连接:{send_url}\n请求参数:{data}\n请求响应:{r_json}" + ) def send_qywx_webhook(self, qywx_webhook, msg): send_url = qywx_webhook @@ -212,7 +220,9 @@ def send_qywx_webhook(self, qywx_webhook, msg): if r_json["errcode"] == 0: logger.debug(f"企业微信机器人推送成功\n通知对象:机器人") else: - logger.error(f"企业微信机器人推送失败\n请求连接:{send_url}\n请求参数:{data}\n请求响应:{r_json}") + logger.error( + f"企业微信机器人推送失败\n请求连接:{send_url}\n请求参数:{data}\n请求响应:{r_json}" + ) @staticmethod def send_feishu_webhook(url, title, content): @@ -239,7 +249,9 @@ def send_feishu_webhook(url, title, content): ): logger.debug(f"飞书Webhook推送成功\n通知对象:{url}\n消息内容:{content}") else: - logger.error(f"飞书Webhook推送失败错误码\n请求url:{url}\n请求data:{data}\n请求响应:{r_json}") + logger.error( + f"飞书Webhook推送失败错误码\n请求url:{url}\n请求data:{data}\n请求响应:{r_json}" + ) @staticmethod def send_feishu_user(title, content, open_id, user_mail): @@ -261,4 +273,6 @@ def send_feishu_user(title, content, open_id, user_mail): if r["code"] == 0: logger.debug(f"飞书单推推送成功\n通知对象:{url}\n消息内容:{content}") else: - logger.error(f"飞书单推推送失败错误码\n请求url:{url}\n请求data:{data}\n请求响应:{r}") + logger.error( + f"飞书单推推送失败错误码\n请求url:{url}\n请求data:{data}\n请求响应:{r}" + ) diff --git a/sql/archiver.py b/sql/archiver.py index e27d545470..f191c1fd9a 100644 --- a/sql/archiver.py +++ b/sql/archiver.py @@ -148,7 +148,9 @@ def archive_apply(request): ): return JsonResponse({"status": 1, "msg": "请填写完整!", "data": {}}) if mode == "dest" and not all([dest_instance_name, dest_db_name, dest_table_name]): - return JsonResponse({"status": 1, "msg": "归档到实例时目标实例信息必选!", "data": {}}) + return JsonResponse( + {"status": 1, "msg": "归档到实例时目标实例信息必选!", "data": {}} + ) # 获取源实例信息 try: @@ -165,7 +167,9 @@ def archive_apply(request): instance_name=dest_instance_name ) except Instance.DoesNotExist: - return JsonResponse({"status": 1, "msg": "你所在组未关联该实例!", "data": {}}) + return JsonResponse( + {"status": 1, "msg": "你所在组未关联该实例!", "data": {}} + ) else: d_ins = None @@ -203,7 +207,9 @@ def archive_apply(request): audit_handler.create_audit() except AuditException as e: logger.error(f"新建审批流失败: {str(e)}") - return JsonResponse({"status": 1, "msg": "新建审批流失败, 请联系管理员", "data": {}}) + return JsonResponse( + {"status": 1, "msg": "新建审批流失败, 请联系管理员", "data": {}} + ) audit_handler.workflow.status = audit_handler.audit.current_status if audit_handler.audit.current_status == WorkflowStatus.PASSED: audit_handler.workflow.state = True @@ -376,9 +382,9 @@ def archive(archive_id): elif mode == "file": output_directory = os.path.join(settings.BASE_DIR, "downloads/archiver") os.makedirs(output_directory, exist_ok=True) - args[ - "file" - ] = f"{output_directory}/{s_ins.instance_name}-{src_db_name}-{src_table_name}.txt" + args["file"] = ( + f"{output_directory}/{s_ins.instance_name}-{src_db_name}-{src_table_name}.txt" + ) if no_delete: args["no-delete"] = True elif mode == "purge": @@ -447,9 +453,11 @@ def archive(archive_id): shell_cmd = " ".join(cmd_args) ArchiveLog.objects.create( archive=archive_info, - cmd=shell_cmd.replace(s_ins.password, "***").replace(d_ins.password, "***") - if mode == "dest" - else shell_cmd.replace(s_ins.password, "***"), + cmd=( + shell_cmd.replace(s_ins.password, "***").replace(d_ins.password, "***") + if mode == "dest" + else shell_cmd.replace(s_ins.password, "***") + ), condition=condition, mode=mode, no_delete=no_delete, diff --git a/sql/data_dictionary.py b/sql/data_dictionary.py index 6916883e39..68b0415549 100644 --- a/sql/data_dictionary.py +++ b/sql/data_dictionary.py @@ -121,7 +121,9 @@ def export(request): elif request.user.is_superuser: dbs = query_engine.get_all_databases().rows else: - return JsonResponse({"status": 1, "msg": "仅管理员可以导出整个实例的字典信息!", "data": []}) + return JsonResponse( + {"status": 1, "msg": "仅管理员可以导出整个实例的字典信息!", "data": []} + ) # 获取数据,存入目录 path = os.path.join(settings.BASE_DIR, "downloads", "dictionary") @@ -149,9 +151,9 @@ def export(request): return JsonResponse({"status": 1, "msg": "实例名或db名不合法", "data": []}) response = FileResponse(open(fullpath, "rb")) response["Content-Type"] = "application/octet-stream" - response[ - "Content-Disposition" - ] = f'attachment;filename="{quote(instance_name)}_{quote(db_name)}.html"' + response["Content-Disposition"] = ( + f'attachment;filename="{quote(instance_name)}_{quote(db_name)}.html"' + ) return response else: diff --git a/sql/engines/__init__.py b/sql/engines/__init__.py index c3c6f7ddd9..6858e6dcfa 100644 --- a/sql/engines/__init__.py +++ b/sql/engines/__init__.py @@ -1,4 +1,5 @@ """engine base库, 包含一个``EngineBase`` class和一个get_engine函数""" + import importlib from sql.engines.models import ResultSet, ReviewSet from sql.utils.ssh_tunnel import SSHConnection diff --git a/sql/engines/goinception.py b/sql/engines/goinception.py index c97d328e2a..6f9fa29007 100644 --- a/sql/engines/goinception.py +++ b/sql/engines/goinception.py @@ -86,7 +86,9 @@ def execute_check(self, instance=None, db_name=None, sql=""): {sql.rstrip(';')}; inception_magic_commit;""" inception_result = self.query(sql=inception_sql) - check_result.syntax_type = 2 # TODO 工单类型 0、其他 1、DDL,2、DML 仅适用于MySQL,待调整 + check_result.syntax_type = ( + 2 # TODO 工单类型 0、其他 1、DDL,2、DML 仅适用于MySQL,待调整 + ) for r in inception_result.rows: check_result.rows += [ReviewResult(inception_result=r)] if r[2] == 1: # 警告 diff --git a/sql/engines/mongo.py b/sql/engines/mongo.py index 17e5794d1b..b1ededea1b 100644 --- a/sql/engines/mongo.py +++ b/sql/engines/mongo.py @@ -363,7 +363,9 @@ def exec_cmd(self, sql, db_name=None, slave_ok=""): msg = "\n".join(_re_msg) msg = msg.replace("true\n", "") except Exception as e: - logger.warning(f"mongo语句执行报错,语句:{sql},{e}错误信息{traceback.format_exc()}") + logger.warning( + f"mongo语句执行报错,语句:{sql},{e}错误信息{traceback.format_exc()}" + ) finally: if is_load: fp.close() @@ -639,7 +641,8 @@ def execute_check(self, db_name=None, sql=""): id=line, errlevel=2, stagestatus="后台创建索引", - errormessage="创建索引没有加 background:true" + alert, + errormessage="创建索引没有加 background:true" + + alert, sql=check_sql, ) elif not is_in: @@ -650,7 +653,8 @@ def execute_check(self, db_name=None, sql=""): ) # 获得表的总条数 if count >= 5000000: check_result.warning = ( - alert + "大于500万条,请在业务低谷期创建索引" + alert + + "大于500万条,请在业务低谷期创建索引" ) check_result.warning_count += 1 result = ReviewResult( @@ -1119,7 +1123,9 @@ def query(self, db_name=None, sql="", limit_num=0, close_conn=True, **kwargs): ) except Exception as e: - logger.warning(f"Mongo命令执行报错,语句:{sql}, 错误信息:{traceback.format_exc()}") + logger.warning( + f"Mongo命令执行报错,语句:{sql}, 错误信息:{traceback.format_exc()}" + ) result_set.error = str(e) finally: if close_conn: diff --git a/sql/engines/mssql.py b/sql/engines/mssql.py index 6d930b3f59..8b1344e422 100644 --- a/sql/engines/mssql.py +++ b/sql/engines/mssql.py @@ -352,7 +352,9 @@ def query( result_set.rows = [tuple(x) for x in rows] result_set.affected_rows = len(result_set.rows) except Exception as e: - logger.warning(f"MsSQL语句执行报错,语句:{sql},错误信息{traceback.format_exc()}") + logger.warning( + f"MsSQL语句执行报错,语句:{sql},错误信息{traceback.format_exc()}" + ) result_set.error = str(e) finally: if close_conn: @@ -420,7 +422,9 @@ def execute(self, db_name=None, sql="", close_conn=True, parameters=None): try: cursor.execute(statement) except Exception as e: - logger.warning(f"Mssql命令执行报错,语句:{sql}, 错误信息:{traceback.format_exc()}") + logger.warning( + f"Mssql命令执行报错,语句:{sql}, 错误信息:{traceback.format_exc()}" + ) execute_result.error = str(e) execute_result.rows.append( ReviewResult( diff --git a/sql/engines/mysql.py b/sql/engines/mysql.py index ebd825a705..033bb02ecf 100644 --- a/sql/engines/mysql.py +++ b/sql/engines/mysql.py @@ -524,7 +524,9 @@ def query( if kwargs.get("binary_as_hex"): result_set = self.result_set_binary_as_hex(result_set) except Exception as e: - logger.warning(f"MySQL语句执行报错,语句:{sql},错误信息{traceback.format_exc()}") + logger.warning( + f"MySQL语句执行报错,语句:{sql},错误信息{traceback.format_exc()}" + ) result_set.error = str(e) finally: if close_conn: @@ -617,14 +619,18 @@ def execute_check(self, db_name=None, sql=""): instance=self.instance, db_name=db_name, sql=sql ) except Exception as e: - logger.debug(f"{self.inc_engine.name}检测语句报错:错误信息{traceback.format_exc()}") + logger.debug( + f"{self.inc_engine.name}检测语句报错:错误信息{traceback.format_exc()}" + ) raise RuntimeError( f"{self.inc_engine.name}检测语句报错,请注意检查系统配置中{self.inc_engine.name}配置,错误信息:\n{e}" ) # 判断Inception检测结果 if check_result.error: - logger.debug(f"{self.inc_engine.name}检测语句报错:错误信息{check_result.error}") + logger.debug( + f"{self.inc_engine.name}检测语句报错:错误信息{check_result.error}" + ) raise RuntimeError( f"{self.inc_engine.name}检测语句报错,错误信息:\n{check_result.error}" ) @@ -699,7 +705,9 @@ def execute(self, db_name=None, sql="", close_conn=True, parameters=None): conn.commit() cursor.close() except Exception as e: - logger.warning(f"MySQL语句执行报错,语句:{sql},错误信息{traceback.format_exc()}") + logger.warning( + f"MySQL语句执行报错,语句:{sql},错误信息{traceback.format_exc()}" + ) result.error = str(e) if close_conn: self.close() diff --git a/sql/engines/oracle.py b/sql/engines/oracle.py index 96c2bd29d5..ab86c04e7a 100644 --- a/sql/engines/oracle.py +++ b/sql/engines/oracle.py @@ -608,7 +608,9 @@ def explain_check(self, db_name=None, sql="", close_conn=False): else: result["rows"] = rows[0] except Exception as e: - logger.warning(f"Oracle 语句执行报错,语句:{sql},错误信息{traceback.format_exc()}") + logger.warning( + f"Oracle 语句执行报错,语句:{sql},错误信息{traceback.format_exc()}" + ) result["msg"] = str(e) finally: if close_conn: @@ -691,7 +693,9 @@ def query( result_set.rows = [tuple(x) for x in rows] result_set.affected_rows = len(result_set.rows) except Exception as e: - logger.warning(f"Oracle 语句执行报错,语句:{sql},错误信息{traceback.format_exc()}") + logger.warning( + f"Oracle 语句执行报错,语句:{sql},错误信息{traceback.format_exc()}" + ) result_set.error = str(e) finally: if close_conn: @@ -751,7 +755,9 @@ def execute_check(self, db_name=None, sql="", close_conn=True): id=line, errlevel=2, stagestatus="驳回高危SQL", - errormessage="禁止提交匹配" + critical_ddl_regex + "条件的语句!", + errormessage="禁止提交匹配" + + critical_ddl_regex + + "条件的语句!", sql=sqlitem.statement, ) # 驳回未带where数据修改语句,如确实需做全部删除或更新,显示的带上where 1=1 @@ -1421,7 +1427,9 @@ def sqltuningadvisor(self, db_name=None, sql="", close_conn=True, **kwargs): result_set.rows = [tuple(x) for x in rows] result_set.affected_rows = len(result_set.rows) except Exception as e: - logger.warning(f"Oracle 语句执行报错,语句:{sql},错误信息{traceback.format_exc()}") + logger.warning( + f"Oracle 语句执行报错,语句:{sql},错误信息{traceback.format_exc()}" + ) result_set.error = str(e) finally: # 结束分析任务 @@ -1445,7 +1453,9 @@ def execute(self, db_name=None, sql="", close_conn=True, parameters=None): statement = statement.rstrip(";") cursor.execute(statement, parameters or []) except Exception as e: - logger.warning(f"Oracle语句执行报错,语句:{sql},错误信息{traceback.format_exc()}") + logger.warning( + f"Oracle语句执行报错,语句:{sql},错误信息{traceback.format_exc()}" + ) result.error = str(e) if close_conn: self.close() diff --git a/sql/engines/pgsql.py b/sql/engines/pgsql.py index c87cb91e8e..1554d0ed73 100644 --- a/sql/engines/pgsql.py +++ b/sql/engines/pgsql.py @@ -208,7 +208,9 @@ def query( result_set.rows = rows result_set.affected_rows = effect_row except Exception as e: - logger.warning(f"PgSQL命令执行报错,语句:{sql}, 错误信息:{traceback.format_exc()}") + logger.warning( + f"PgSQL命令执行报错,语句:{sql}, 错误信息:{traceback.format_exc()}" + ) result_set.error = str(e) finally: if close_conn: diff --git a/sql/engines/phoenix.py b/sql/engines/phoenix.py index fc7ac592d1..5579702603 100644 --- a/sql/engines/phoenix.py +++ b/sql/engines/phoenix.py @@ -121,7 +121,9 @@ def query( result_set.rows = [tuple(x) for x in rows] result_set.affected_rows = len(result_set.rows) except Exception as e: - logger.warning(f"PhoenixDB语句执行报错,语句:{sql},错误信息{traceback.format_exc()}") + logger.warning( + f"PhoenixDB语句执行报错,语句:{sql},错误信息{traceback.format_exc()}" + ) result_set.error = str(e) finally: if close_conn: @@ -170,7 +172,9 @@ def execute(self, db_name=None, sql="", close_conn=True, parameters=None): try: cursor.execute(statement.rstrip(";"), parameters) except Exception as e: - logger.error(f"Phoenix命令执行报错,语句:{sql}, 错误信息:{traceback.format_exc()}") + logger.error( + f"Phoenix命令执行报错,语句:{sql}, 错误信息:{traceback.format_exc()}" + ) execute_result.error = str(e) execute_result.rows.append( ReviewResult( diff --git a/sql/engines/redis.py b/sql/engines/redis.py index d58249067a..36f5bda592 100644 --- a/sql/engines/redis.py +++ b/sql/engines/redis.py @@ -144,7 +144,9 @@ def query(self, db_name=None, sql="", limit_num=0, close_conn=True, **kwargs): if limit_num > 0: result_set.rows = result_set.rows[0:limit_num] except Exception as e: - logger.warning(f"Redis命令执行报错,语句:{sql}, 错误信息:{traceback.format_exc()}") + logger.warning( + f"Redis命令执行报错,语句:{sql}, 错误信息:{traceback.format_exc()}" + ) result_set.error = str(e) return result_set diff --git a/sql/engines/tests.py b/sql/engines/tests.py index 5b49eee7a8..03f5b4b688 100644 --- a/sql/engines/tests.py +++ b/sql/engines/tests.py @@ -2023,7 +2023,8 @@ def test_execute_check_select_sql(self): select_sql = "select id,name from tb_test" check_result = new_engine.execute_check(db_name="some_db", sql=select_sql) self.assertEqual( - check_result.rows[0].errormessage, "仅支持DML和DDL语句,查询语句请使用SQL查询功能!" + check_result.rows[0].errormessage, + "仅支持DML和DDL语句,查询语句请使用SQL查询功能!", ) @patch.object(ClickHouseEngine, "query") diff --git a/sql/form.py b/sql/form.py index 3187b08833..348bba8b38 100644 --- a/sql/form.py +++ b/sql/form.py @@ -31,7 +31,9 @@ def clean(self): str(pkey_path, "utf-8").replace(r"\r", "").replace(r"\n", "") ) except IOError: - raise ValidationError("秘钥文件不存在, 请勾选秘钥路径的清除选项再进行保存") + raise ValidationError( + "秘钥文件不存在, 请勾选秘钥路径的清除选项再进行保存" + ) class InstanceForm(ModelForm): diff --git a/sql/instance.py b/sql/instance.py index a73da10ed2..54486be745 100644 --- a/sql/instance.py +++ b/sql/instance.py @@ -186,7 +186,11 @@ def param_edit(request): variable_name=variable_name, variable_value=variable_value ) if set_result.error: - result = {"status": 1, "msg": f"设置错误,错误信息:{set_result.error}", "data": []} + result = { + "status": 1, + "msg": f"设置错误,错误信息:{set_result.error}", + "data": [], + } return HttpResponse(json.dumps(result), content_type="application/json") # 修改成功的保存修改记录 else: diff --git a/sql/instance_account.py b/sql/instance_account.py index 0e085cd00c..2f6f6ceae8 100644 --- a/sql/instance_account.py +++ b/sql/instance_account.py @@ -89,7 +89,9 @@ def create(request): ) or ( instance.db_type == "mongo" and not all([db_name, user, password1, password2]) ): - return JsonResponse({"status": 1, "msg": "参数不完整,请确认后提交", "data": []}) + return JsonResponse( + {"status": 1, "msg": "参数不完整,请确认后提交", "data": []} + ) if password1 != password2: return JsonResponse({"status": 1, "msg": "两次输入密码不一致", "data": []}) @@ -137,7 +139,9 @@ def edit(request): if (instance.db_type == "mysql" and not all([user, host])) or ( instance.db_type == "mongo" and not all([db_name, user]) ): - return JsonResponse({"status": 1, "msg": "参数不完整,请确认后提交", "data": []}) + return JsonResponse( + {"status": 1, "msg": "参数不完整,请确认后提交", "data": []} + ) # 保存到数据库 if password: @@ -186,7 +190,9 @@ def grant(request): if priv_type == 0: global_privs = privs["global_privs"] if not all([global_privs]): - return JsonResponse({"status": 1, "msg": "信息不完整,请确认后提交", "data": []}) + return JsonResponse( + {"status": 1, "msg": "信息不完整,请确认后提交", "data": []} + ) global_privs = ["GRANT OPTION" if g == "GRANT" else g for g in global_privs] if op_type == 0: grant_sql = f"GRANT {','.join(global_privs)} ON *.* TO {user_host};" @@ -198,7 +204,9 @@ def grant(request): db_privs = privs["db_privs"] db_name = request.POST.getlist("db_name[]") if not all([db_privs, db_name]): - return JsonResponse({"status": 1, "msg": "信息不完整,请确认后提交", "data": []}) + return JsonResponse( + {"status": 1, "msg": "信息不完整,请确认后提交", "data": []} + ) for db in db_name: db_privs = ["GRANT OPTION" if d == "GRANT" else d for d in db_privs] if op_type == 0: @@ -215,7 +223,9 @@ def grant(request): db_name = request.POST.get("db_name") tb_name = request.POST.getlist("tb_name[]") if not all([tb_privs, db_name, tb_name]): - return JsonResponse({"status": 1, "msg": "信息不完整,请确认后提交", "data": []}) + return JsonResponse( + {"status": 1, "msg": "信息不完整,请确认后提交", "data": []} + ) for tb in tb_name: tb_privs = ["GRANT OPTION" if t == "GRANT" else t for t in tb_privs] if op_type == 0: @@ -229,7 +239,9 @@ def grant(request): tb_name = request.POST.get("tb_name") col_name = request.POST.getlist("col_name[]") if not all([col_privs, db_name, tb_name, col_name]): - return JsonResponse({"status": 1, "msg": "信息不完整,请确认后提交", "data": []}) + return JsonResponse( + {"status": 1, "msg": "信息不完整,请确认后提交", "data": []} + ) for priv in col_privs: if op_type == 0: grant_sql += f"GRANT {priv}(`{'`,`'.join(col_name)}`) ON `{db_name}`.`{tb_name}` TO {user_host};" @@ -281,7 +293,9 @@ def reset_pwd(request): ) or ( instance.db_type == "mongo" and not all([db_name, user, reset_pwd1, reset_pwd2]) ): - return JsonResponse({"status": 1, "msg": "参数不完整,请确认后提交", "data": []}) + return JsonResponse( + {"status": 1, "msg": "参数不完整,请确认后提交", "data": []} + ) if reset_pwd1 != reset_pwd2: return JsonResponse({"status": 1, "msg": "两次输入密码不一致", "data": []}) @@ -323,7 +337,9 @@ def lock(request): lock_sql = "" if not all([user_host]): - return JsonResponse({"status": 1, "msg": "参数不完整,请确认后提交", "data": []}) + return JsonResponse( + {"status": 1, "msg": "参数不完整,请确认后提交", "data": []} + ) try: instance = user_instances(request.user, db_type=["mysql"]).get(id=instance_id) @@ -365,7 +381,9 @@ def delete(request): if (instance.db_type == "mysql" and not all([user_host])) or ( instance.db_type == "mongo" and not all([db_name_user]) ): - return JsonResponse({"status": 1, "msg": "参数不完整,请确认后提交", "data": []}) + return JsonResponse( + {"status": 1, "msg": "参数不完整,请确认后提交", "data": []} + ) engine = get_engine(instance=instance) exec_result = engine.drop_instance_user( diff --git a/sql/instance_database.py b/sql/instance_database.py index 223b3f36b2..42aaf257f1 100644 --- a/sql/instance_database.py +++ b/sql/instance_database.py @@ -76,7 +76,9 @@ def create(request): remark = request.POST.get("remark", "") if not all([db_name]): - return JsonResponse({"status": 1, "msg": "参数不完整,请确认后提交", "data": []}) + return JsonResponse( + {"status": 1, "msg": "参数不完整,请确认后提交", "data": []} + ) try: instance = user_instances(request.user, db_type=["mysql", "mongo"]).get( @@ -138,7 +140,9 @@ def edit(request): remark = request.POST.get("remark", "") if not all([db_name]): - return JsonResponse({"status": 1, "msg": "参数不完整,请确认后提交", "data": []}) + return JsonResponse( + {"status": 1, "msg": "参数不完整,请确认后提交", "data": []} + ) try: instance = user_instances(request.user, db_type=["mysql", "mongo"]).get( diff --git a/sql/models.py b/sql/models.py index aaa0b00a3e..05c5eb0ce2 100755 --- a/sql/models.py +++ b/sql/models.py @@ -23,7 +23,9 @@ class ResourceGroup(models.Model): ding_webhook = models.CharField("钉钉webhook地址", max_length=255, blank=True) feishu_webhook = models.CharField("飞书webhook地址", max_length=255, blank=True) qywx_webhook = models.CharField("企业微信webhook地址", max_length=255, blank=True) - is_deleted = models.IntegerField("是否删除", choices=((0, "否"), (1, "是")), default=0) + is_deleted = models.IntegerField( + "是否删除", choices=((0, "否"), (1, "是")), default=0 + ) create_time = models.DateTimeField(auto_now_add=True) sys_time = models.DateTimeField(auto_now=True) @@ -47,7 +49,9 @@ class Users(AbstractUser): wx_user_id = models.CharField("企业微信UserID", max_length=64, blank=True) feishu_open_id = models.CharField("飞书OpenID", max_length=64, blank=True) failed_login_count = models.IntegerField("失败计数", default=0) - last_login_failed_at = models.DateTimeField("上次失败登录时间", blank=True, null=True) + last_login_failed_at = models.DateTimeField( + "上次失败登录时间", blank=True, null=True + ) resource_group = models.ManyToManyField( ResourceGroup, verbose_name="资源组", blank=True ) @@ -207,7 +211,9 @@ class Instance(models.Model): resource_group = models.ManyToManyField( ResourceGroup, verbose_name="资源组", blank=True ) - instance_tag = models.ManyToManyField(InstanceTag, verbose_name="实例标签", blank=True) + instance_tag = models.ManyToManyField( + InstanceTag, verbose_name="实例标签", blank=True + ) tunnel = models.ForeignKey( Tunnel, verbose_name="连接隧道", @@ -283,7 +289,9 @@ class SqlWorkflow(models.Model, WorkflowAuditMixin): instance = models.ForeignKey(Instance, on_delete=models.CASCADE) db_name = models.CharField("数据库", max_length=64) syntax_type = models.IntegerField( - "工单类型 0、未知,1、DDL,2、DML", choices=((0, "其他"), (1, "DDL"), (2, "DML")), default=0 + "工单类型 0、未知,1、DDL,2、DML", + choices=((0, "其他"), (1, "DDL"), (2, "DML")), + default=0, ) is_backup = models.BooleanField( "是否备份", @@ -301,7 +309,9 @@ class SqlWorkflow(models.Model, WorkflowAuditMixin): run_date_end = models.DateTimeField("可执行结束时间", null=True, blank=True) create_time = models.DateTimeField("创建时间", auto_now_add=True) finish_time = models.DateTimeField("结束时间", null=True, blank=True) - is_manual = models.IntegerField("是否原生执行", choices=((0, "否"), (1, "是")), default=0) + is_manual = models.IntegerField( + "是否原生执行", choices=((0, "否"), (1, "是")), default=0 + ) def __str__(self): return self.workflow_name @@ -345,7 +355,9 @@ class WorkflowAudit(models.Model): workflow_id = models.BigIntegerField("关联业务id") workflow_type = models.IntegerField("申请类型", choices=WorkflowType.choices) workflow_title = models.CharField("申请标题", max_length=50) - workflow_remark = models.CharField("申请备注", default="", max_length=140, blank=True) + workflow_remark = models.CharField( + "申请备注", default="", max_length=140, blank=True + ) audit_auth_groups = models.CharField("审批权限组列表", max_length=255) current_audit = models.CharField("当前审批权限组", max_length=20) next_audit = models.CharField("下级审批权限组", max_length=20) @@ -432,7 +444,9 @@ class WorkflowLog(models.Model): id = models.AutoField(primary_key=True) audit_id = models.IntegerField("工单审批id", db_index=True) - operation_type = models.SmallIntegerField("操作类型", choices=WorkflowAction.choices) + operation_type = models.SmallIntegerField( + "操作类型", choices=WorkflowAction.choices + ) # operation_type_desc 字段实际无意义 operation_type_desc = models.CharField("操作类型描述", max_length=10) operation_info = models.CharField("操作信息", max_length=1000) @@ -549,7 +563,9 @@ class QueryLog(models.Model): default=False, ) hit_rule = models.BooleanField( - "查询是否命中脱敏规则", choices=((False, "未命中/未知"), (True, "命中")), default=False + "查询是否命中脱敏规则", + choices=((False, "未命中/未知"), (True, "命中")), + default=False, ) masking = models.BooleanField( "查询结果是否正常脱敏", @@ -595,12 +611,16 @@ class DataMaskingColumns(models.Model): column_id = models.AutoField("字段id", primary_key=True) rule_type = models.IntegerField("规则类型", choices=rule_type_choices) - active = models.BooleanField("激活状态", choices=((False, "未激活"), (True, "激活"))) + active = models.BooleanField( + "激活状态", choices=((False, "未激活"), (True, "激活")) + ) instance = models.ForeignKey(Instance, on_delete=models.CASCADE) table_schema = models.CharField("字段所在库名", max_length=64) table_name = models.CharField("字段所在表名", max_length=64) column_name = models.CharField("字段名", max_length=64) - column_comment = models.CharField("字段描述", max_length=1024, default="", blank=True) + column_comment = models.CharField( + "字段描述", max_length=1024, default="", blank=True + ) create_time = models.DateTimeField(auto_now_add=True) sys_time = models.DateTimeField(auto_now=True) @@ -617,7 +637,10 @@ class DataMaskingRules(models.Model): """ rule_type = models.IntegerField("规则类型", choices=rule_type_choices, unique=True) - rule_regex = models.CharField("规则脱敏所用的正则表达式,表达式必须分组,隐藏的组会使用****代替", max_length=255) + rule_regex = models.CharField( + "规则脱敏所用的正则表达式,表达式必须分组,隐藏的组会使用****代替", + max_length=255, + ) hide_group = models.IntegerField("需要隐藏的组") rule_desc = models.CharField("规则描述", max_length=100, default="", blank=True) sys_time = models.DateTimeField(auto_now=True) @@ -636,8 +659,12 @@ class InstanceAccount(models.Model): instance = models.ForeignKey(Instance, on_delete=models.CASCADE) user = fields.EncryptedCharField(verbose_name="账号", max_length=128) - host = models.CharField(verbose_name="主机", max_length=64) # mysql数据库存储主机信息 - db_name = models.CharField(verbose_name="数据库名称", max_length=128) # mongo数据库存储数据库名称 + host = models.CharField( + verbose_name="主机", max_length=64 + ) # mysql数据库存储主机信息 + db_name = models.CharField( + verbose_name="数据库名称", max_length=128 + ) # mongo数据库存储数据库名称 password = fields.EncryptedCharField( verbose_name="密码", max_length=128, default="", blank=True ) @@ -660,7 +687,9 @@ class InstanceDatabase(models.Model): instance = models.ForeignKey(Instance, on_delete=models.CASCADE) db_name = models.CharField("数据库名", max_length=128) owner = models.CharField("负责人", max_length=50, default="", blank=True) - owner_display = models.CharField("负责人中文名", max_length=50, default="", blank=True) + owner_display = models.CharField( + "负责人中文名", max_length=50, default="", blank=True + ) remark = models.CharField("备注", max_length=255, default="", blank=True) sys_time = models.DateTimeField("系统修改时间", auto_now=True) @@ -753,7 +782,9 @@ class ArchiveConfig(models.Model, WorkflowAuditMixin): ) state = models.BooleanField("是否启用归档", default=True) user_name = models.CharField("申请人", max_length=30, blank=True, default="") - user_display = models.CharField("申请人中文名", max_length=50, blank=True, default="") + user_display = models.CharField( + "申请人中文名", max_length=50, blank=True, default="" + ) create_time = models.DateTimeField("创建时间", auto_now_add=True) last_archive_time = models.DateTimeField("最近归档时间", blank=True, null=True) sys_time = models.DateTimeField("系统时间修改", auto_now=True) diff --git a/sql/notify.py b/sql/notify.py index 1ac8c0b784..4ab3ea98bc 100755 --- a/sql/notify.py +++ b/sql/notify.py @@ -54,9 +54,9 @@ class My2SqlResult: @dataclass class Notifier: - workflow: Union[ - SqlWorkflow, ArchiveConfig, QueryPrivilegesApply, My2SqlResult - ] = None + workflow: Union[SqlWorkflow, ArchiveConfig, QueryPrivilegesApply, My2SqlResult] = ( + None + ) sys_config: SysConfig = None # init false, class property, 不是 instance property name: str = field(init=False, default="base") @@ -173,9 +173,7 @@ def render_audit(self): auth_ends_at = datetime.datetime.strftime( workflow_detail.valid_date, "%Y-%m-%d %H:%M:%S" ) - workflow_content += ( - f"""授权截止时间:{auth_ends_at}\n结果集:{workflow_detail.limit_num}\n""" - ) + workflow_content += f"""授权截止时间:{auth_ends_at}\n结果集:{workflow_detail.limit_num}\n""" elif workflow_type == WorkflowType.SQL_REVIEW: workflow_type_display = WorkflowType.SQL_REVIEW.label workflow_detail = SqlWorkflow.objects.get(pk=workflow_id) @@ -256,7 +254,9 @@ def render_audit(self): re.sub("[\r\n\f]{2,}", "\n", self.audit_detail.remark), ) elif status == WorkflowStatus.ABORTED: # 审核取消,通知所有审核人 - msg_title = "[{}]提交人主动终止工单#{}".format(workflow_type_display, audit_id) + msg_title = "[{}]提交人主动终止工单#{}".format( + workflow_type_display, audit_id + ) # 接收人,发送给该资源组内对应权限组所有的用户 auth_group_names = [ Group.objects.get(id=auth_group_id).name diff --git a/sql/plugins/plugin.py b/sql/plugins/plugin.py index 3e05a82917..4b3d6a51ea 100644 --- a/sql/plugins/plugin.py +++ b/sql/plugins/plugin.py @@ -32,7 +32,11 @@ def check_args(self, args): # 检查禁用参数 for arg in args.keys(): if arg in self.disable_args: - return {"status": 1, "msg": "{arg}参数已被禁用".format(arg=arg), "data": {}} + return { + "status": 1, + "msg": "{arg}参数已被禁用".format(arg=arg), + "data": {}, + } # 检查必须参数 for req_arg in self.required_args: if req_arg not in args.keys(): diff --git a/sql/plugins/tests.py b/sql/plugins/tests.py index e7c8107fe4..dccdc58f2a 100644 --- a/sql/plugins/tests.py +++ b/sql/plugins/tests.py @@ -58,7 +58,8 @@ def test_check_args_path(self): soar = Soar() args_check_result = soar.check_args(args) self.assertDictEqual( - args_check_result, {"status": 1, "msg": "可执行文件路径不能为空!", "data": {}} + args_check_result, + {"status": 1, "msg": "可执行文件路径不能为空!", "data": {}}, ) # 路径不为空 self.sys_config.set("soar", "/opt/archery/src/plugins/soar") diff --git a/sql/query.py b/sql/query.py index 5baddbdad9..a0747f616b 100644 --- a/sql/query.py +++ b/sql/query.py @@ -148,7 +148,9 @@ def query(request): result["msg"] = f"数据脱敏异常,请联系管理员,错误信息:{msg}" # 关闭query_check,忽略错误信息,返回未脱敏数据,权限校验标记为跳过 else: - logger.warning(f"数据脱敏异常,按照配置放行,查询语句:{sql_content},错误信息:{msg}") + logger.warning( + f"数据脱敏异常,按照配置放行,查询语句:{sql_content},错误信息:{msg}" + ) query_result.error = None result["data"] = query_result.__dict__ # 无需脱敏的语句 @@ -181,7 +183,9 @@ def query(request): ) query_log.save() except Exception as e: - logger.error(f"查询异常报错,查询语句:{sql_content}\n,错误信息:{traceback.format_exc()}") + logger.error( + f"查询异常报错,查询语句:{sql_content}\n,错误信息:{traceback.format_exc()}" + ) result["status"] = 1 result["msg"] = f"查询异常报错,错误信息:{e}" return HttpResponse(json.dumps(result), content_type="application/json") diff --git a/sql/query_privileges.py b/sql/query_privileges.py index b8598ec6bd..4a13004362 100644 --- a/sql/query_privileges.py +++ b/sql/query_privileges.py @@ -79,9 +79,9 @@ def query_priv_check(user, instance, db_name, sql_content, limit_num): ): # 没有库表查询权限时的staus为2 result["status"] = 2 - result[ - "msg" - ] = f"你无{table['schema']}.{table['name']}表的查询权限!请先到查询权限管理进行申请" + result["msg"] = ( + f"你无{table['schema']}.{table['name']}表的查询权限!请先到查询权限管理进行申请" + ) return result # 获取查询涉及库/表权限的最小limit限制,和前端传参作对比,取最小值 for table in table_ref: @@ -117,7 +117,9 @@ def query_priv_check(user, instance, db_name, sql_content, limit_num): if not _db_priv(user, instance, db_name): # 没有库表查询权限时的staus为2 result["status"] = 2 - result["msg"] = f"你无{db_name}数据库的查询权限!请先到查询权限管理进行申请" + result["msg"] = ( + f"你无{db_name}数据库的查询权限!请先到查询权限管理进行申请" + ) return result # 有所有库权限则获取最小limit值 for db_name in dbs: @@ -242,7 +244,9 @@ def query_priv_apply(request): for db_name in db_list: if _db_priv(user, ins, db_name): result["status"] = 1 - result["msg"] = f"你已拥有{instance_name}实例{db_name}库权限,不能重复申请" + result["msg"] = ( + f"你已拥有{instance_name}实例{db_name}库权限,不能重复申请" + ) return HttpResponse(json.dumps(result), content_type="application/json") # 表权限 @@ -250,13 +254,17 @@ def query_priv_apply(request): # 先检查是否拥有库权限 if _db_priv(user, ins, db_name): result["status"] = 1 - result["msg"] = f"你已拥有{instance_name}实例{db_name}库的全部权限,不能重复申请" + result["msg"] = ( + f"你已拥有{instance_name}实例{db_name}库的全部权限,不能重复申请" + ) return HttpResponse(json.dumps(result), content_type="application/json") # 检查申请账号是否已拥有该表的查询权限 for tb_name in table_list: if _tb_priv(user, ins, db_name, tb_name): result["status"] = 1 - result["msg"] = f"你已拥有{instance_name}实例{db_name}.{tb_name}表的查询权限,不能重复申请" + result["msg"] = ( + f"你已拥有{instance_name}实例{db_name}.{tb_name}表的查询权限,不能重复申请" + ) return HttpResponse(json.dumps(result), content_type="application/json") apply_info = QueryPrivilegesApply( @@ -417,7 +425,9 @@ def query_priv_audit(request): try: audit_status = WorkflowAction(int(request.POST["audit_status"])) except ValueError as e: - return render(request, "error.html", {"errMsg": f"audit_status 参数错误, {str(e)}"}) + return render( + request, "error.html", {"errMsg": f"audit_status 参数错误, {str(e)}"} + ) audit_remark = request.POST.get("audit_remark") if not audit_remark: diff --git a/sql/slowlog.py b/sql/slowlog.py index ae51f7ed7d..3db96c6142 100644 --- a/sql/slowlog.py +++ b/sql/slowlog.py @@ -79,8 +79,12 @@ def slowquery_review(request): MySQLTotalExecutionTimes=Sum( "slowqueryhistory__query_time_sum" ), # 执行总时长 - ParseTotalRowCounts=Sum("slowqueryhistory__rows_examined_sum"), # 扫描总行数 - ReturnTotalRowCounts=Sum("slowqueryhistory__rows_sent_sum"), # 返回总行数 + ParseTotalRowCounts=Sum( + "slowqueryhistory__rows_examined_sum" + ), # 扫描总行数 + ReturnTotalRowCounts=Sum( + "slowqueryhistory__rows_sent_sum" + ), # 返回总行数 ParseRowAvg=Sum("slowqueryhistory__rows_examined_sum") / Sum("slowqueryhistory__ts_cnt"), # 平均扫描行数 ReturnRowAvg=Sum("slowqueryhistory__rows_sent_sum") @@ -158,7 +162,9 @@ def slowquery_review_history(request): sample__icontains=search, **filter_kwargs ).annotate( - ExecutionStartTime=F("ts_min"), # 本次统计(每5分钟一次)该类型sql语句出现的最小时间 + ExecutionStartTime=F( + "ts_min" + ), # 本次统计(每5分钟一次)该类型sql语句出现的最小时间 DBName=F("db_max"), # 数据库名 HostAddress=Concat( V("'"), "user_max", V("'"), V("@"), V("'"), "client_max", V("'") diff --git a/sql/sql_analyze.py b/sql/sql_analyze.py index 876e329052..25ce5716dd 100644 --- a/sql/sql_analyze.py +++ b/sql/sql_analyze.py @@ -61,7 +61,9 @@ def analyze(request): instance_name=instance_name ) except Instance.DoesNotExist: - return JsonResponse({"status": 1, "msg": "你所在组未关联该实例!", "data": []}) + return JsonResponse( + {"status": 1, "msg": "你所在组未关联该实例!", "data": []} + ) soar_test_dsn = SysConfig().get("soar_test_dsn") # 获取实例连接信息 online_dsn = f"{instance.user}:{instance.password}@{instance.host}:{instance.port}/{db_name}" @@ -81,7 +83,9 @@ def analyze(request): try: p = Path(row["sql"].strip()) if p.exists(): - return JsonResponse({"status": 1, "msg": "SQL 语句不合法", "data": []}) + return JsonResponse( + {"status": 1, "msg": "SQL 语句不合法", "data": []} + ) except OSError: pass args["query"] = row["sql"] diff --git a/sql/sql_workflow.py b/sql/sql_workflow.py index 584602aced..b812be7c51 100644 --- a/sql/sql_workflow.py +++ b/sql/sql_workflow.py @@ -154,7 +154,8 @@ def detail_content(request): ReviewResult( id=1, sql=workflow_detail.sqlworkflowcontent.sql_content, - errormessage="Json decode failed." "执行结果Json解析失败, 请联系管理员", + errormessage="Json decode failed." + "执行结果Json解析失败, 请联系管理员", ) ] rows = review_result.json() @@ -164,7 +165,8 @@ def detail_content(request): id=1, sql=workflow_detail.sqlworkflowcontent.sql_content, # 迫于无法单元测试这里加上英文报错信息 - errormessage="Json decode failed." "执行结果Json解析失败, 请联系管理员", + errormessage="Json decode failed." + "执行结果Json解析失败, 请联系管理员", ) ] rows = review_result.json() @@ -252,7 +254,9 @@ def passed(request): WorkflowAction.PASS, request.user, audit_remark ) except AuditException as e: - return render(request, "error.html", {"errMsg": f"审核失败, 错误信息: {str(e)}"}) + return render( + request, "error.html", {"errMsg": f"审核失败, 错误信息: {str(e)}"} + ) if auditor.audit.current_status == WorkflowStatus.PASSED: # 审批流全部走完了, 把工单标记为审核通过 auditor.workflow.status = "workflow_review_pass" @@ -298,7 +302,9 @@ def execute(request): return render(request, "error.html", context) if on_correct_time_period(workflow_id) is False: - context = {"errMsg": "不在可执行时间范围内,如果需要修改执行时间请重新提交工单!"} + context = { + "errMsg": "不在可执行时间范围内,如果需要修改执行时间请重新提交工单!" + } return render(request, "error.html", context) # 获取审核信息 audit_id = Audit.detail_by_workflow_id( @@ -393,7 +399,9 @@ def timing_task(request): schedule_name = f"sqlreview-timing-{workflow_id}" if on_correct_time_period(workflow_id, run_date) is False: - context = {"errMsg": "不在可执行时间范围内,如果需要修改执 行时间请重新提交工单!"} + context = { + "errMsg": "不在可执行时间范围内,如果需要修改执 行时间请重新提交工单!" + } return render(request, "error.html", context) # 使用事务保持数据一致性 diff --git a/sql/test_archiver.py b/sql/test_archiver.py index 6f5025fc1a..419d337253 100644 --- a/sql/test_archiver.py +++ b/sql/test_archiver.py @@ -163,7 +163,8 @@ def test_archive_apply_not_dest_param(self): self.client.force_login(self.superuser) r = self.client.post(path="/archive/apply/", data=data) self.assertDictEqual( - json.loads(r.content), {"status": 1, "msg": "归档到实例时目标实例信息必选!", "data": {}} + json.loads(r.content), + {"status": 1, "msg": "归档到实例时目标实例信息必选!", "data": {}}, ) def test_archive_apply_not_exist_review(self): diff --git a/sql/tests.py b/sql/tests.py index f72ff2e58e..5536223a8f 100644 --- a/sql/tests.py +++ b/sql/tests.py @@ -1044,14 +1044,16 @@ def test_sqladvisor(self, _subprocess): ) r = self.client.post(path="/slowquery/optimize_sqladvisor/") self.assertEqual( - json.loads(r.content), {"status": 1, "msg": "页面提交参数可能为空", "data": []} + json.loads(r.content), + {"status": 1, "msg": "页面提交参数可能为空", "data": []}, ) r = self.client.post( path="/slowquery/optimize_sqladvisor/", data={"sql_content": "select 1;", "instance_name": "test_instance"}, ) self.assertEqual( - json.loads(r.content), {"status": 1, "msg": "请配置SQLAdvisor路径!", "data": []} + json.loads(r.content), + {"status": 1, "msg": "请配置SQLAdvisor路径!", "data": []}, ) self.sys_config.set("sqladvisor", "/opt/archery/src/plugins/sqladvisor") self.sys_config.get_all_config() @@ -1093,7 +1095,8 @@ def test_soar(self, _subprocess): ) r = self.client.post(path="/slowquery/optimize_soar/") self.assertEqual( - json.loads(r.content), {"status": 1, "msg": "页面提交参数可能为空", "data": []} + json.loads(r.content), + {"status": 1, "msg": "页面提交参数可能为空", "data": []}, ) r = self.client.post( path="/slowquery/optimize_soar/", @@ -1133,7 +1136,8 @@ def test_tuning(self): data["instance_name"] = "test_instancex" r = self.client.post(path="/slowquery/optimize_sqltuning/", data=data) self.assertEqual( - json.loads(r.content), {"status": 1, "msg": "你所在组未关联该实例!", "data": []} + json.loads(r.content), + {"status": 1, "msg": "你所在组未关联该实例!", "data": []}, ) # 获取sys_parm @@ -1466,7 +1470,8 @@ def test_my2sql_path_not_exist(self): } r = self.client.post(path="/binlog/my2sql/", data=data) self.assertEqual( - json.loads(r.content), {"status": 1, "msg": "可执行文件路径不能为空!", "data": {}} + json.loads(r.content), + {"status": 1, "msg": "可执行文件路径不能为空!", "data": {}}, ) @patch("sql.plugins.plugin.subprocess") @@ -1560,7 +1565,8 @@ def test_del_binlog_binlog_not_exist(self): data = {"instance_id": self.master.id, "binlog": ""} r = self.client.post(path="/binlog/del_log/", data=data) self.assertEqual( - json.loads(r.content), {"status": 1, "msg": "Error:未选择binlog!", "data": ""} + json.loads(r.content), + {"status": 1, "msg": "Error:未选择binlog!", "data": ""}, ) @patch("sql.engines.mysql.MysqlEngine.query") @@ -1589,7 +1595,8 @@ def test_del_binlog_wrong(self, _get_engine, _query): _query.return_value.error = "清理失败" r = self.client.post(path="/binlog/del_log/", data=data) self.assertEqual( - json.loads(r.content), {"status": 2, "msg": "清理失败,Error:清理失败", "data": ""} + json.loads(r.content), + {"status": 2, "msg": "清理失败,Error:清理失败", "data": ""}, ) @@ -1673,7 +1680,8 @@ def test_param_edit_variable_not_config( } r = self.client.post(path="/param/edit/", data=data) self.assertEqual( - json.loads(r.content), {"data": [], "msg": "请先在参数模板中配置该参数!", "status": 1} + json.loads(r.content), + {"data": [], "msg": "请先在参数模板中配置该参数!", "status": 1}, ) @patch("sql.engines.mysql.MysqlEngine.set_variable") @@ -1703,7 +1711,8 @@ def test_param_edit_variable_not_change( } r = self.client.post(path="/param/edit/", data=data) self.assertEqual( - json.loads(r.content), {"status": 1, "msg": "参数值与实际运行值一致,未调整!", "data": []} + json.loads(r.content), + {"status": 1, "msg": "参数值与实际运行值一致,未调整!", "data": []}, ) @patch("sql.engines.mysql.MysqlEngine.set_variable") @@ -1733,7 +1742,8 @@ def test_param_edit_variable_change( } r = self.client.post(path="/param/edit/", data=data) self.assertEqual( - json.loads(r.content), {"status": 0, "msg": "修改成功,请手动持久化到配置文件!", "data": []} + json.loads(r.content), + {"status": 0, "msg": "修改成功,请手动持久化到配置文件!", "data": []}, ) @patch("sql.engines.mysql.MysqlEngine.set_variable") @@ -1763,7 +1773,8 @@ def test_param_edit_variable_error( } r = self.client.post(path="/param/edit/", data=data) self.assertEqual( - json.loads(r.content), {"status": 1, "msg": "设置错误,错误信息:修改报错", "data": []} + json.loads(r.content), + {"status": 1, "msg": "设置错误,错误信息:修改报错", "data": []}, ) @@ -1777,7 +1788,9 @@ def setUp(self): self.su = User.objects.create( username="s_user", display="中文显示", is_active=True, is_superuser=True ) - self.u1 = User.objects.create(username="user1", display="中文显示", is_active=True) + self.u1 = User.objects.create( + username="user1", display="中文显示", is_active=True + ) self.client = Client() self.client.force_login(self.su) # 使用 travis.ci 时实例和测试service保持一致 @@ -1948,7 +1961,8 @@ def test_export_instance_does_not_exist(self): } r = self.client.get(path="/data_dictionary/export/", data=data) self.assertDictEqual( - json.loads(r.content), {"data": [], "msg": "你所在组未关联该实例!", "status": 1} + json.loads(r.content), + {"data": [], "msg": "你所在组未关联该实例!", "status": 1}, ) @patch("sql.data_dictionary.user_instances") diff --git a/sql/utils/test_workflow_audit.py b/sql/utils/test_workflow_audit.py index c3a0e2f45f..ce6fec1b72 100644 --- a/sql/utils/test_workflow_audit.py +++ b/sql/utils/test_workflow_audit.py @@ -303,7 +303,9 @@ def test_can_review_no_prem_exception( self.audit.workflow_type = WorkflowType.SQL_REVIEW self.audit.workflow_id = self.wf.id self.audit.save() - with self.assertRaisesMessage(Exception, "当前审批auth_group_id不存在,请检查并清洗历史数据"): + with self.assertRaisesMessage( + Exception, "当前审批auth_group_id不存在,请检查并清洗历史数据" + ): Audit.can_review( self.user, self.audit.workflow_id, self.audit.workflow_type ) diff --git a/sql/utils/workflow_audit.py b/sql/utils/workflow_audit.py index 4fbdf10f94..e03bb18371 100644 --- a/sql/utils/workflow_audit.py +++ b/sql/utils/workflow_audit.py @@ -381,7 +381,9 @@ def can_operate(self, action: WorkflowAction, actor: Users): try: audit_auth_group = Group.objects.get(id=self.audit.current_audit) except Group.DoesNotExist: - raise AuditException("当前审批权限组不存在, 请联系管理员检查并清洗错误数据") + raise AuditException( + "当前审批权限组不存在, 请联系管理员检查并清洗错误数据" + ) if not auth_group_users([audit_auth_group.name], self.resource_group_id): raise AuditException("用户不在当前审批审批节点的用户组内, 无权限审核") return True diff --git a/sql/views.py b/sql/views.py index 05b58a052a..b3b6967b15 100644 --- a/sql/views.py +++ b/sql/views.py @@ -282,13 +282,15 @@ def rollback(request): # 返回 response = FileResponse(open(file_name, "rb")) response["Content-Type"] = "application/octet-stream" - response[ - "Content-Disposition" - ] = f'attachment;filename="rollback_{workflow_id}.sql"' + response["Content-Disposition"] = ( + f'attachment;filename="rollback_{workflow_id}.sql"' + ) return response # 异步获取,并在页面展示,如果数据量大加载会缓慢 else: - rollback_workflow_name = f"【回滚工单】原工单Id:{workflow_id} ,{workflow.workflow_name}" + rollback_workflow_name = ( + f"【回滚工单】原工单Id:{workflow_id} ,{workflow.workflow_name}" + ) context = { "workflow_detail": workflow, "rollback_workflow_name": rollback_workflow_name, diff --git a/sql_api/api_instance.py b/sql_api/api_instance.py index 6787ca4149..d25f93713e 100644 --- a/sql_api/api_instance.py +++ b/sql_api/api_instance.py @@ -204,7 +204,9 @@ def post(self, request): db_name=db_name, tb_name=tb_name, schema_name=schema_name ) else: - raise serializers.ValidationError({"errors": "不支持的资源类型或者参数不完整!"}) + raise serializers.ValidationError( + {"errors": "不支持的资源类型或者参数不完整!"} + ) except Exception as msg: raise serializers.ValidationError({"errors": msg}) else: diff --git a/sql_api/api_user.py b/sql_api/api_user.py index 34a4c38224..fcef581dbc 100644 --- a/sql_api/api_user.py +++ b/sql_api/api_user.py @@ -247,7 +247,9 @@ class UserAuth(views.APIView): permission_classes = [IsOwner] - @extend_schema(summary="用户认证校验", request=UserAuthSerializer, description="用户认证校验") + @extend_schema( + summary="用户认证校验", request=UserAuthSerializer, description="用户认证校验" + ) def post(self, request): # 参数验证 serializer = UserAuthSerializer(data=request.data) @@ -323,7 +325,9 @@ class TwoFAState(views.APIView): permission_classes = [IsOwner] @extend_schema( - summary="查询2fa配置情况", request=TwoFAStateSerializer, description="查询2fa配置情况" + summary="查询2fa配置情况", + request=TwoFAStateSerializer, + description="查询2fa配置情况", ) def post(self, request): # 参数验证 diff --git a/sql_api/api_workflow.py b/sql_api/api_workflow.py index baf55e7974..d834a5af30 100644 --- a/sql_api/api_workflow.py +++ b/sql_api/api_workflow.py @@ -215,7 +215,9 @@ class AuditWorkflow(views.APIView): """ @extend_schema( - summary="审核工单", request=AuditWorkflowSerializer, description="审核一条工单(通过或终止)" + summary="审核工单", + request=AuditWorkflowSerializer, + description="审核一条工单(通过或终止)", ) def post(self, request): # 参数验证 @@ -303,7 +305,9 @@ class ExecuteWorkflow(views.APIView): """ @extend_schema( - summary="执行工单", request=ExecuteWorkflowSerializer, description="执行一条工单" + summary="执行工单", + request=ExecuteWorkflowSerializer, + description="执行一条工单", ) def post(self, request): # 参数验证 @@ -332,7 +336,9 @@ def post(self, request): if on_correct_time_period(workflow_id) is False: raise serializers.ValidationError( - {"errors": "不在可执行时间范围内,如果需要修改执行时间请重新提交工单!"} + { + "errors": "不在可执行时间范围内,如果需要修改执行时间请重新提交工单!" + } ) # 获取审核信息 diff --git a/sql_api/serializers.py b/sql_api/serializers.py index 56dfc31c72..051d38a74f 100644 --- a/sql_api/serializers.py +++ b/sql_api/serializers.py @@ -294,7 +294,9 @@ def validate_instance_id(self, instance_id): try: Instance.objects.get(pk=instance_id) except Instance.DoesNotExist: - raise serializers.ValidationError({"errors": f"不存在该实例:{instance_id}"}) + raise serializers.ValidationError( + {"errors": f"不存在该实例:{instance_id}"} + ) return instance_id def get_instance(self): @@ -453,7 +455,8 @@ class AuditWorkflowSerializer(serializers.Serializer): workflow_id = serializers.IntegerField(label="工单id") audit_remark = serializers.CharField(label="审批备注") workflow_type = serializers.ChoiceField( - choices=WorkflowType.choices, label="工单类型:1-查询权限申请,2-SQL上线申请,3-数据归档申请" + choices=WorkflowType.choices, + label="工单类型:1-查询权限申请,2-SQL上线申请,3-数据归档申请", ) audit_type = serializers.ChoiceField(choices=["pass", "cancel"], label="审核类型") @@ -504,7 +507,8 @@ class Meta: class WorkflowLogSerializer(serializers.Serializer): workflow_id = serializers.IntegerField(label="工单id") workflow_type = serializers.ChoiceField( - choices=[1, 2, 3], label="工单类型:1-查询权限申请,2-SQL上线申请,3-数据归档申请" + choices=[1, 2, 3], + label="工单类型:1-查询权限申请,2-SQL上线申请,3-数据归档申请", ) def validate(self, attrs): @@ -539,7 +543,9 @@ class ExecuteWorkflowSerializer(serializers.Serializer): choices=[2, 3], label="工单类型:1-查询权限申请,2-SQL上线申请,3-数据归档申请" ) mode = serializers.ChoiceField( - choices=["auto", "manual"], label="执行模式:auto-线上执行,manual-已手动执行", required=False + choices=["auto", "manual"], + label="执行模式:auto-线上执行,manual-已手动执行", + required=False, ) def validate(self, attrs): @@ -558,7 +564,9 @@ def validate(self, attrs): try: Users.objects.get(username=engineer) except Users.DoesNotExist: - raise serializers.ValidationError({"errors": f"不存在该用户:{engineer}"}) + raise serializers.ValidationError( + {"errors": f"不存在该用户:{engineer}"} + ) try: WorkflowAudit.objects.get( diff --git a/sql_api/views.py b/sql_api/views.py index 5038969bdf..ba9bc276d5 100644 --- a/sql_api/views.py +++ b/sql_api/views.py @@ -103,9 +103,11 @@ def debug(request): django_q_info = { "version": django_q_version, "conf": django_q.conf.Conf.conf, - "q_cluster_stats": q_cluster_stats - if q_cluster_stats - else "没有正在运行的集群信息,请检查django_q状态", + "q_cluster_stats": ( + q_cluster_stats + if q_cluster_stats + else "没有正在运行的集群信息,请检查django_q状态" + ), "q_broker_stats": q_broker_stats, } except Exception as e: