Skip to content

Commit

Permalink
支持ClickHouse上线
Browse files Browse the repository at this point in the history
支持ClickHouse上线
  • Loading branch information
nick2wang committed Mar 1, 2022
1 parent 8e2a196 commit ed9e709
Show file tree
Hide file tree
Showing 4 changed files with 336 additions and 3 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Archery是[archer](https://github.com/jly8866/archer)的分支项目,定位于
| MongoDB |||| × | × | × | × | × | × | × |
| Phoenix || × || × | × | × | × | × | × | × |
| ODPS || × | × | × | × | × | × | × | × | × |
| ClickHouse || × | × | × | × | × | × | × | × | × |
| ClickHouse || | | × | × | × | × | × | × | × |



Expand Down
238 changes: 236 additions & 2 deletions sql/engines/clickhouse.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# -*- coding: UTF-8 -*-
from clickhouse_driver import connect
from sql.utils.sql_utils import get_syntax_type
from .models import ResultSet, ReviewResult, ReviewSet
from common.utils.timer import FuncTimer
from common.config import SysConfig
from . import EngineBase
from .models import ResultSet
import sqlparse
import logging
import re
Expand All @@ -13,6 +16,7 @@ class ClickHouseEngine(EngineBase):

def __init__(self, instance=None):
super(ClickHouseEngine, self).__init__(instance=instance)
self.config = SysConfig()

def get_connection(self, db_name=None):
if self.conn:
Expand Down Expand Up @@ -45,12 +49,25 @@ def server_version(self):
version = result.rows[0][0].split(' ')[1]
return tuple([int(n) for n in version.split('.')[:3]])

def get_table_engine(self, tb_name):
"""获取某个table的engine type"""
sql = f"""select engine
from system.tables
where database='{tb_name.split('.')[0]}'
and name='{tb_name.split('.')[1]}'"""
query_result = self.query(sql=sql)
if query_result.rows:
result = {'status': 1, 'engine': query_result.rows[0][0]}
else:
result = {'status': 0, 'engine': 'None'}
return result

def get_all_databases(self):
"""获取数据库列表, 返回一个ResultSet"""
sql = "show databases"
result = self.query(sql=sql)
db_list = [row[0] for row in result.rows
if row[0] not in ('system','INFORMATION_SCHEMA', 'information_schema', 'datasets')]
if row[0] not in ('system', 'INFORMATION_SCHEMA', 'information_schema', 'datasets')]
result.rows = db_list
return result

Expand Down Expand Up @@ -170,6 +187,223 @@ def filter_sql(self, sql='', limit_num=0):
sql = f'{sql};'
return sql

def explain_check(self, check_result, db_name=None, line=0, statement=''):
"""使用explain ast检查sql语法, 返回Review set"""
result = ReviewResult(id=line, errlevel=0,
stagestatus='Audit completed',
errormessage='None',
sql=statement,
affected_rows=0,
execute_time=0, )
# clickhouse版本>=20.6.3才使用explain检查
if self.server_version >= (20, 6, 3):
explain_result = self.query(db_name=db_name, sql=f"explain ast {statement}")
if explain_result.error:
check_result.is_critical = True
result = ReviewResult(id=line, errlevel=2,
stagestatus='驳回未通过检查SQL',
errormessage=f'explain语法检查错误:{explain_result.error}',
sql=statement)
return result

def execute_check(self, db_name=None, sql=''):
"""上线单执行前的检查, 返回Review set"""
sql = sqlparse.format(sql, strip_comments=True)
sql_list = sqlparse.split(sql)

# 禁用/高危语句检查
check_result = ReviewSet(full_sql=sql)
line = 1
critical_ddl_regex = self.config.get('critical_ddl_regex', '')
p = re.compile(critical_ddl_regex)
check_result.syntax_type = 2 # TODO 工单类型 0、其他 1、DDL,2、DML

for statement in sql_list:
statement = statement.rstrip(';')
# 禁用语句
if re.match(r"^select|^show", statement.lower()):
check_result.is_critical = True
result = ReviewResult(id=line, errlevel=2,
stagestatus='驳回不支持语句',
errormessage='仅支持DML和DDL语句,查询语句请使用SQL查询功能!',
sql=statement)
# 高危语句
elif critical_ddl_regex and p.match(statement.strip().lower()):
check_result.is_critical = True
result = ReviewResult(id=line, errlevel=2,
stagestatus='驳回高危SQL',
errormessage='禁止提交匹配' + critical_ddl_regex + '条件的语句!',
sql=statement)
# alter语句
elif re.match(r"^alter", statement.lower()):
# alter table语句
if re.match(r"^alter\s+table\s+(.+?)\s+", statement.lower()):
table_name = re.match(r"^alter\s+table\s+(.+?)\s+", statement.lower(), re.M).group(1)
if '.' not in table_name:
table_name = f"{db_name}.{table_name}"
table_engine = self.get_table_engine(table_name)['engine']
table_exist = self.get_table_engine(table_name)['status']
if table_exist == 1:
if not table_engine.endswith('MergeTree') and table_engine not in ('Merge', 'Distributed'):
check_result.is_critical = True
result = ReviewResult(id=line, errlevel=2,
stagestatus='驳回不支持SQL',
errormessage='ALTER TABLE仅支持*MergeTree,Merge以及Distributed等引擎表!',
sql=statement)
else:
# delete与update语句,实际是alter语句的变种
if re.match(r"^alter\s+table\s+(.+?)\s+(delete|update)\s+", statement.lower()):
if not table_engine.endswith('MergeTree'):
check_result.is_critical = True
result = ReviewResult(id=line, errlevel=2,
stagestatus='驳回不支持SQL',
errormessage='DELETE与UPDATE仅支持*MergeTree引擎表!',
sql=statement)
else:
result = self.explain_check(check_result, db_name, line, statement)
else:
result = self.explain_check(check_result, db_name, line, statement)
else:
check_result.is_critical = True
result = ReviewResult(id=line, errlevel=2,
stagestatus='表不存在',
errormessage=f'表 {table_name} 不存在!',
sql=statement)
# 其他alter语句
else:
result = self.explain_check(check_result, db_name, line, statement)
# truncate语句
elif re.match(r"^truncate\s+table\s+(.+?)(\s|$)", statement.lower()):
table_name = re.match(r"^truncate\s+table\s+(.+?)(\s|$)", statement.lower(), re.M).group(1)
if '.' not in table_name:
table_name = f"{db_name}.{table_name}"
table_engine = self.get_table_engine(table_name)['engine']
table_exist = self.get_table_engine(table_name)['status']
if table_exist == 1:
if table_engine in ('View', 'File,', 'URL', 'Buffer', 'Null'):
check_result.is_critical = True
result = ReviewResult(id=line, errlevel=2,
stagestatus='驳回不支持SQL',
errormessage='TRUNCATE不支持View,File,URL,Buffer和Null表引擎!',
sql=statement)
else:
result = self.explain_check(check_result, db_name, line, statement)
else:
check_result.is_critical = True
result = ReviewResult(id=line, errlevel=2,
stagestatus='表不存在',
errormessage=f'表 {table_name} 不存在!',
sql=statement)
# insert语句,explain无法正确判断,暂时只做表存在性检查与简单关键字匹配
elif re.match(r"^insert", statement.lower()):
if re.match(r"^insert\s+into\s+(.+?)(\s+|\s*\(.+?)(values|format|select)(\s+|\()", statement.lower()):
table_name = re.match(r"^insert\s+into\s+(.+?)(\s+|\s*\(.+?)(values|format|select)(\s+|\()",
statement.lower(), re.M).group(1)
if '.' not in table_name:
table_name = f"{db_name}.{table_name}"
table_exist = self.get_table_engine(table_name)['status']
if table_exist == 1:
result = ReviewResult(id=line, errlevel=0,
stagestatus='Audit completed',
errormessage='None',
sql=statement,
affected_rows=0,
execute_time=0, )
else:
check_result.is_critical = True
result = ReviewResult(id=line, errlevel=2,
stagestatus='表不存在',
errormessage=f'表 {table_name} 不存在!',
sql=statement)
else:
check_result.is_critical = True
result = ReviewResult(id=line, errlevel=2,
stagestatus='驳回不支持SQL',
errormessage='INSERT语法不正确!',
sql=statement)
# 其他语句使用explain ast简单检查
else:
result = self.explain_check(check_result, db_name, line, statement)

# 没有找出DDL语句的才继续执行此判断
if check_result.syntax_type == 2:
if get_syntax_type(statement, parser=False, db_type='mysql') == 'DDL':
check_result.syntax_type = 1
check_result.rows += [result]

# 遇到禁用和高危语句直接返回
if check_result.is_critical:
check_result.error_count += 1
return check_result
line += 1
return check_result

def execute_workflow(self, workflow):
"""执行上线单,返回Review set"""
sql = workflow.sqlworkflowcontent.sql_content
execute_result = ReviewSet(full_sql=sql)
sqls = sqlparse.format(sql, strip_comments=True)
sql_list = sqlparse.split(sqls)

line = 1
for statement in sql_list:
with FuncTimer() as t:
result = self.execute(db_name=workflow.db_name, sql=statement, close_conn=True)
if not result.error:
execute_result.rows.append(ReviewResult(
id=line,
errlevel=0,
stagestatus='Execute Successfully',
errormessage='None',
sql=statement,
affected_rows=0,
execute_time=t.cost,
))
line += 1
else:
# 追加当前报错语句信息到执行结果中
execute_result.error = result.error
execute_result.rows.append(ReviewResult(
id=line,
errlevel=2,
stagestatus='Execute Failed',
errormessage=f'异常信息:{result.error}',
sql=statement,
affected_rows=0,
execute_time=0,
))
line += 1
# 报错语句后面的语句标记为审核通过、未执行,追加到执行结果中
for statement in sql_list[line - 1:]:
execute_result.rows.append(ReviewResult(
id=line,
errlevel=0,
stagestatus='Audit completed',
errormessage=f'前序语句失败, 未执行',
sql=statement,
affected_rows=0,
execute_time=0,
))
line += 1
break
return execute_result

def execute(self, db_name=None, sql='', close_conn=True):
"""原生执行语句"""
result = ResultSet(full_sql=sql)
conn = self.get_connection(db_name=db_name)
try:
cursor = conn.cursor()
for statement in sqlparse.split(sql):
cursor.execute(statement)
cursor.close()
except Exception as e:
logger.warning(f"ClickHouse语句执行报错,语句:{sql},错误信息{e}")
result.error = str(e).split('Stack trace')[0]
if close_conn:
self.close()
return result

def close(self):
if self.conn:
self.conn.close()
Expand Down
Loading

0 comments on commit ed9e709

Please sign in to comment.