Skip to content

Commit

Permalink
binlog2sql以plugin的形式对接,降低代码复杂度
Browse files Browse the repository at this point in the history
  • Loading branch information
hhyo committed Mar 23, 2019
1 parent cc21fcb commit fb8db9a
Show file tree
Hide file tree
Showing 14 changed files with 296 additions and 517 deletions.
70 changes: 37 additions & 33 deletions common/templates/config.html
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ <h5 style="color: darkgrey"><b>SQL查询</b></h5>
<input id="disable_star"
key="disable_star"
value="{{ config.disable_star }}" type="checkbox">
禁止在查询时使用 *
禁止在查询时使用 *
</label>
</div>
</div>
Expand Down Expand Up @@ -287,6 +287,21 @@ <h5 style="color: darkgrey"><b>SQL优化</b></h5>
<h4 style="color: darkgrey;display: inline"><b>通知配置</b></h4>&nbsp;&nbsp;&nbsp;
<button id='check_email' class='btn-sm btn-info'>测试EMAIL连接</button>
<hr/>
<div class="form-group">
<label for="archery_base_url"
class="col-sm-4 control-label">ARCHERY_BASE_URL</label>
<div class="col-sm-8">
<div class="switch switch-small">
<label>
<input type="text" class="form-control"
id="archery_base_url"
key="archery_base_url"
value="{{ config.archery_base_url }}"
placeholder="http://archery.com">系统首页地址, 用于钉钉和邮件发送链接使用
</label>
</div>
</div>
</div>
<div class="form-horizontal">
<div class="form-group">
<label for="mail"
Expand Down Expand Up @@ -398,6 +413,17 @@ <h4 style="color: darkgrey"><b>其他配置</b></h4>
placeholder="schemasync调用路径">
</div>
</div>
<div class="form-group">
<label for="lock_time_threshold"
class="col-sm-4 control-label">BINLOG2SQL</label>
<div class="col-sm-5">
<input type="text" class="form-control"
id="binlog2sql"
key="binlog2sql"
value="{{ config.binlog2sql }}"
placeholder="binlog2sql调用路径,类似/opt/binlog2sql.py">
</div>
</div>
<div class="form-group">
<label for="default_auth_group"
class="col-sm-4 control-label">DEFAULT_AUTH_GROUP</label>
Expand Down Expand Up @@ -448,27 +474,13 @@ <h4 style="color: darkgrey"><b>其他配置</b></h4>
<div class="col-sm-8">
<div class="switch switch-small">
<label>
<input id="sign_up_enabled" key="sign_up_enabled" value="{{ config.sign_up_enabled }}"
<input id="sign_up_enabled" key="sign_up_enabled"
value="{{ config.sign_up_enabled }}"
type="checkbox"> 是否开启注册功能
</label>
</div>
</div>
</div>
<div class="form-group">
<label for="archery_base_url"
class="col-sm-4 control-label">ARCHERY_BASE_URL</label>
<div class="col-sm-8">
<div class="switch switch-small">
<label>
<input type="text" class="form-control"
id="archery_base_url"
key="archery_base_url"
value="{{ config.archery_base_url }}"
placeholder="http://archery.com">系统首页地址, 用于钉钉和邮件发送链接使用
</label>
</div>
</div>
</div>
</div>
<div class="form-group">
<div class="col-sm-offset-4 col-sm-10">
Expand Down Expand Up @@ -526,13 +538,11 @@ <h5 class="control-label text-bold">当前审批流程:<b id="workflow_auditor
$('input[type="checkbox"]').each(function (i) {
if ($(this).val() === 'true') {
$(this).bootstrapSwitch('state', true);
}
else {
} else {
$(this).bootstrapSwitch('state', false);
}
});
}
else if ($("#config").val() === '1') {
} else if ($("#config").val() === '1') {
$("#div-system-config").hide();
$("#div-workflow").show();
$("#div-workflow-config").show();
Expand All @@ -546,18 +556,15 @@ <h5 class="control-label text-bold">当前审批流程:<b id="workflow_auditor
$(this).val(true);
if ($(this).attr("id") === 'mail') {
$("#div-mail-config").show();
}
else if ($(this).attr("id") === 'auto_review') {
} else if ($(this).attr("id") === 'auto_review') {
$("#div-auto-review-config").show();
}
}
else {
} else {
$(this).val(false);
if ($(this).attr("id") === 'mail') {
$("#mail_ssl").val(false);
$("#div-mail-config").hide();
}
else if ($(this).attr("id") === 'auto_review') {
} else if ($(this).attr("id") === 'auto_review') {
$("#div-auto-review-config").hide();
}
}
Expand Down Expand Up @@ -611,8 +618,7 @@ <h5 class="control-label text-bold">当前审批流程:<b id="workflow_auditor
var auditors = $("#workflow_auditors_text").val();
if (auditors) {
$("#workflow_auditors_text").val(auditors + '->' + auth_group);
}
else {
} else {
$("#workflow_auditors_text").val(auth_group)
}
$('#group_auditors').selectpicker('render');
Expand Down Expand Up @@ -687,8 +693,7 @@ <h5 class="control-label text-bold">当前审批流程:<b id="workflow_auditor
alert(errorThrown);
}
});
}
else {
} else {
alert('请选择组、工单类型和审批流程!')
}

Expand Down Expand Up @@ -764,8 +769,7 @@ <h5 class="control-label text-bold">当前审批流程:<b id="workflow_auditor
$(document).ready(function () {
if (sessionStorage.getItem('config_type')) {
$("#config").val(sessionStorage.getItem('config_type')).trigger("change")
}
else if ($("#config").val()) {
} else if ($("#config").val()) {
$("#config").trigger("change")
}
})
Expand Down
119 changes: 80 additions & 39 deletions sql/binlog2sql.py → sql/binlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
from django.conf import settings
from django.contrib.auth.decorators import permission_required
from django.http import HttpResponse
from django_q.tasks import async_task

from common.utils.extend_json_encoder import ExtendJSONEncoder
from sql.engines import get_engine
from sql.utils.binlog2sql.binlog2sql import Binlog2sql

from sql.plugins.binglog2sql import Binlog2Sql
from .models import Instance

logger = logging.getLogger('default')
Expand Down Expand Up @@ -48,11 +50,12 @@ def binlog_list(request):
@permission_required('sql.menu_binlog2sql', raise_exception=True)
def binlog2sql(request):
instance_name = request.POST.get('instance_name')
save_sql = True if request.POST.get('save_sql') == 'true' else False
instance = Instance.objects.get(instance_name=instance_name)
conn_setting = {'host': instance.host, 'port': int(instance.port), 'user': instance.user,
'passwd': instance.raw_password, 'charset': 'utf8'}
no_pk = True if request.POST.get('no_pk') == 'true' else False
flashback = True if request.POST.get('flashback') == 'true' else False
back_interval = 0 if request.POST.get('back_interval') == '' else int(request.POST.get('back_interval'))
num = 500 if request.POST.get('num') == '' else int(request.POST.get('num'))
start_file = request.POST.get('start_file')
start_pos = request.POST.get('start_pos') if request.POST.get('start_pos') == '' else int(
request.POST.get('start_pos'))
Expand All @@ -68,50 +71,88 @@ def binlog2sql(request):

# flashback=True获取DML回滚语句
result = {'status': 0, 'msg': 'ok', 'data': ''}

# 提交给binlog2sql进行解析
binlog2sql = Binlog2Sql()
# 准备参数
args = {"conn_options": fr"-h{instance.host} -u{instance.user} -p'{instance.raw_password}' -P{instance.port} ",
"stop_never": False,
"no-primary-key": no_pk,
"flashback": flashback,
"back-interval": back_interval,
"start-file": start_file,
"start-position": start_pos,
"stop-file": end_file,
"stop-position": end_pos,
"start-datetime": start_time,
"stop-datetime": stop_time,
"databases": ' '.join(only_schemas),
"tables": ' '.join(only_tables),
"only-dml": only_dml,
"sql-type": ' '.join(sql_type),
"instance": instance
}

# 参数检查
args_check_result = binlog2sql.check_args(args)
if args_check_result['status'] == 1:
return HttpResponse(json.dumps(args_check_result), content_type='application/json')
# 参数转换
cmd_args = binlog2sql.generate_args2cmd(args, shell=True)
# 执行命令
try:
binlog2sql = Binlog2sql(connection_settings=conn_setting, start_file=start_file, start_pos=start_pos,
end_file=end_file, end_pos=end_pos, start_time=start_time,
stop_time=stop_time, only_schemas=' '.join(only_schemas),
only_tables=' '.join(only_tables),
no_pk=no_pk, flashback=flashback, stop_never=False,
back_interval=1.0, only_dml=only_dml, sql_type=sql_type)
timestamp = int(time.time())
path = os.path.join(settings.BASE_DIR, 'downloads/binlog2sql/')
if flashback:
filename = os.path.join(path, 'flashback_{}_{}_{}.sql'.format(conn_setting['host'],
conn_setting['port'],
timestamp))
else:
filename = os.path.join(path, '{}_{}_{}.sql'.format(conn_setting['host'],
conn_setting['port'],
timestamp))
# 获取sql语句,忽略wait_timeout的错误
try:
binlog2sql.process_binlog(filename)
except pymysql.err.OperationalError:
logger.error(traceback.format_exc())

# 读取前5000行
p = binlog2sql.execute_cmd(cmd_args, shell=True)
# 读取前500行
rows = []
n = 1
with open(filename) as f:
for row in f:
if n <= 5000:
row_info = {}
try:
row_info['sql'] = row.split('; #')[0] + ";"
row_info['binlog_info'] = row.split('; #')[1].rstrip('\"')
except Exception:
row_info['sql'] = row
row_info['binlog_info'] = None
rows.append(row_info)
n = n + 1
for line in iter(p.stdout.readline, ''):
if n <= num:
n = n + 1
row_info = {}
try:
row_info['sql'] = line.split('; #')[0] + ";"
row_info['binlog_info'] = line.split('; #')[1].rstrip('\"')
except IndexError:
row_info['sql'] = line
row_info['binlog_info'] = None
rows.append(row_info)
else:
break
# 终止子进程
p.kill()
result['data'] = rows
except Exception as e:
except RuntimeError as e:
logger.error(traceback.format_exc())
result['status'] = 1
result['msg'] = str(e)

# 保存到文件
if save_sql:
async_task(binlog2sql_file, args=args)

# 返回查询结果
return HttpResponse(json.dumps(result, cls=ExtendJSONEncoder, bigint_as_string=True),
content_type='application/json')


def binlog2sql_file(args):
"""
:param cmd_args: 生成好的命令
:return:
"""
binlog2sql = Binlog2Sql()
instance = args.get('instance')
timestamp = int(time.time())
path = os.path.join(settings.BASE_DIR, 'downloads/binlog2sql/')
if args.get('flashback'):
filename = os.path.join(path, f"flashback_{instance.host}_{instance.port}_{timestamp}.sql")
else:
filename = os.path.join(path, f"{instance.host}_{instance.port}_{timestamp}.sql")

# 参数转换
cmd_args = binlog2sql.generate_args2cmd(args, shell=True)
# 执行命令保存到文件
with open(filename, 'w') as f:
p = binlog2sql.execute_cmd(cmd_args, shell=True)
for c in iter(lambda: p.stdout.read(1), ''):
f.write(c)
3 changes: 2 additions & 1 deletion sql/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ def schemasync(request):
cmd_args = schema_sync.generate_args2cmd(args, shell=True)
# 执行命令
try:
diff_stdout = schema_sync.execute_cmd(cmd_args, shell=True)
stdout, stderr = schema_sync.execute_cmd(cmd_args, shell=True).communicate()
diff_stdout = f'{stdout}{stderr}'
except RuntimeError as e:
diff_stdout = str(e)

Expand Down
61 changes: 61 additions & 0 deletions sql/plugins/binglog2sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# -*- coding: UTF-8 -*-
"""
@author: hhyo
@license: Apache Licence
@file: binglog2sql.py
@time: 2019/03/23
"""
from common.config import SysConfig
from sql.plugins.plugin import Plugin

__author__ = 'hhyo'


class Binlog2Sql(Plugin):

def __init__(self):
self.path = SysConfig().get('binlog2sql')
self.required_args = []
self.disable_args = []
super(Plugin, self).__init__()

def generate_args2cmd(self, args, shell):
"""
转换请求参数为命令行
:param args:
:param shell:
:return:
"""
conn_options = ['conn_options']
parse_mode_options = ['stop-never', 'no-primary-key', 'flashback', 'back-interval']
range_options = ['start-file', 'start-position', 'stop-file', 'stop-position', 'start-datetime',
'stop-datetime']
filter_options = ['databases', 'tables', 'only-dml', 'sql-type']
if shell:
cmd_args = f'python {self.path}' if self.path else ''
for name, value in args.items():
if name in conn_options:
cmd_args += f' {value}'
elif name in parse_mode_options and value:
cmd_args += f' --{name}'
elif name in range_options and value:
cmd_args += f' --{name}={value}'
elif name in filter_options and value:
if name == 'only-dml':
cmd_args += f' --{name}'
else:
cmd_args += f' --{name} {value}'
else:
cmd_args = [self.path]
for name, value in args.items():
if name in conn_options:
cmd_args.append(f'{value}')
elif name in parse_mode_options:
cmd_args.append(f'--{name}')
elif name in range_options:
cmd_args.append(f'--{name}')
cmd_args.append(f'{value}')
elif name in filter_options:
cmd_args.append(f'--{name}')
cmd_args.append(f'{value}')
return cmd_args
7 changes: 3 additions & 4 deletions sql/plugins/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,16 @@ def generate_args2cmd(self, args, shell):
@staticmethod
def execute_cmd(cmd_args, shell):
"""
执行命令并且返回执行信息
执行命令并且返回process
:return:
"""
try:
p = subprocess.Popen(cmd_args,
shell=shell,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stderr=subprocess.PIPE,
universal_newlines=True)
stdout, stderr = p.communicate()
return stdout
return p
except Exception as e:
logger.error("命令执行失败\n{}".format(traceback.format_exc()))
raise RuntimeError('命令执行失败,失败原因:%s' % str(e))
Loading

0 comments on commit fb8db9a

Please sign in to comment.