Skip to content

Commit

Permalink
支持mongodb进程状态查看
Browse files Browse the repository at this point in the history
  • Loading branch information
weideguo committed Jun 2, 2022
1 parent 3aff65d commit 8b7bbf9
Show file tree
Hide file tree
Showing 4 changed files with 425 additions and 116 deletions.
23 changes: 23 additions & 0 deletions common/static/dist/js/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,28 @@ var dateFormat = function(fmt, date) {
return fmt;
};

// 格式与高亮json格式的字符串
var jsonHighLight = function(json) {
json = json.toString().replace(/&/g, '&amp;').replace(/</g, '&lt;').replace(/>/g, '&gt;');
return json.replace(/("(\\u[a-zA-Z0-9]{4}|\\[^u]|[^\\"])*"(\s*:)?|\b(true|false|null)\b|-?\d+(?:\.\d*)?(?:[eE][+\-]?\d+)?)/g, function (match) {
var cls = 'text-muted';
if (/^"/.test(match)) {
if (/:$/.test(match)) {
cls = 'text-success';
} else {
match = match
cls = 'text-primary';
}
} else if (/true|false/.test(match)) {
cls = 'text-success';
} else if (/null/.test(match)) {
cls = 'text-warning';
}
return '<span class="' + cls + '">' + match + '</span>';
});
};

// 这个函数存在报错因此不应该把任何模块放在这个模块之后
// 实例配置页面根据db_type选择显示或隐藏mode字段mode字段只适用于redis实例
(function($) {
$(function() {
Expand All @@ -41,3 +63,4 @@ var dateFormat = function(fmt, date) {
});
});
})(django.jQuery);

33 changes: 33 additions & 0 deletions common/utils/extend_json_encoder.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
# -*- coding: UTF-8 -*-
import base64
import simplejson as json

from decimal import Decimal
from datetime import datetime, date, timedelta
from functools import singledispatch
from ipaddress import IPv4Address, IPv6Address
from uuid import UUID
from bson.objectid import ObjectId
from bson.timestamp import Timestamp


@singledispatch
Expand Down Expand Up @@ -58,6 +61,16 @@ def _(o):
return str(o)


@convert.register(ObjectId)
def _(o):
return str(o)


@convert.register(Timestamp)
def _(o):
return str(o)


class ExtendJSONEncoder(json.JSONEncoder):
def default(self, obj):
try:
Expand All @@ -76,3 +89,23 @@ def default(self, obj):
return convert(obj)
except TypeError:
return super(ExtendJSONEncoderFTime, self).default(obj)


# 使用simplejson处理形如 b'\xaa' 的bytes类型数据会失败,但使用json模块构造这个对象时不能使用bigint_as_string方法
import json
class ExtendJSONEncoderBytes(json.JSONEncoder):
def default(self, obj):
try:
# 使用convert.register处理会报错 ValueError: Circular reference detected
# 不是utf-8格式的bytes格式需要先进行base64编码转换
if isinstance(obj, bytes):
try:
return o.decode('utf-8')
except:
return base64.b64encode(obj).decode('utf-8')
else:
return convert(obj)
except TypeError:
print(type(obj))
return super(ExtendJSONEncoderBytes, self).default(obj)

201 changes: 150 additions & 51 deletions sql/db_diagnostic.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
import logging
import traceback
import MySQLdb

import simplejson as json
#import simplejson as json
import json
from django.contrib.auth.decorators import permission_required

from django.http import HttpResponse

from sql.engines import get_engine
from common.utils.extend_json_encoder import ExtendJSONEncoder
from common.utils.extend_json_encoder import ExtendJSONEncoder, ExtendJSONEncoderBytes
from sql.utils.resource_group import user_instances
from .models import AliyunRdsConfig, Instance

from .aliyun_rds import process_status as aliyun_process_status, create_kill_session as aliyun_create_kill_session, \
kill_session as aliyun_kill_session, sapce_status as aliyun_sapce_status

logger = logging.getLogger('default')

# 问题诊断--进程列表
@permission_required('sql.process_view', raise_exception=True)
Expand All @@ -21,94 +24,178 @@ def process(request):
command_type = request.POST.get('command_type')

try:
instance = user_instances(request.user, db_type=['mysql']).get(instance_name=instance_name)
instance = user_instances(request.user).get(instance_name=instance_name)
except Instance.DoesNotExist:
result = {'status': 1, 'msg': '你所在组未关联该实例', 'data': []}
return HttpResponse(json.dumps(result), content_type='application/json')

base_sql = "select id, user, host, db, command, time, state, ifnull(info,'') as info from information_schema.processlist"
# 判断是RDS还是其他实例
if AliyunRdsConfig.objects.filter(instance=instance, is_enable=True).exists():
result = aliyun_process_status(request)
else:
# escape
command_type = MySQLdb.escape_string(command_type).decode('utf-8')

if command_type == 'All':
sql = base_sql + ";"
elif command_type == 'Not Sleep':
sql = "{} where command<>'Sleep';".format(base_sql)
query_engine = get_engine(instance=instance)
query_result = None
if instance.db_type == 'mysql':
base_sql = "select id, user, host, db, command, time, state, ifnull(info,'') as info from information_schema.processlist"
# 判断是RDS还是其他实例
if AliyunRdsConfig.objects.filter(instance=instance, is_enable=True).exists():
result = aliyun_process_status(request)
else:
sql = "{} where command= '{}';".format(base_sql, command_type)
query_engine = get_engine(instance=instance)
query_result = query_engine.query('information_schema', sql)
# escape
command_type = MySQLdb.escape_string(command_type).decode('utf-8')
if not command_type:
command_type = 'Query'
if command_type == 'All':
sql = base_sql + ";"
elif command_type == 'Not Sleep':
sql = "{} where command<>'Sleep';".format(base_sql)
else:
sql = "{} where command= '{}';".format(base_sql, command_type)

query_result = query_engine.query('information_schema', sql)

elif instance.db_type == 'mongo':
try:
# 这种执行方法不行 query_engine.query('admin', sql)
conn = query_engine.get_connection()
processlists = []
# Full 包含活跃与不活跃的连接,包含内部的连接,即全部的连接状态
# All 包含活跃与不活跃的连接,不包含内部的连接
# Active 包含活跃
# Inner 内部连接
if not command_type:
command_type = 'Active'
if command_type in ['Full','All','Inner']:
idle_connections = True
else:
idle_connections = False

# conn.admin.current_op() 这个方法已经被pymongo废除,但mongodb3.6+才支持aggregate
with conn.admin.aggregate([{"$currentOp": {"allUsers":True,"idleConnections":idle_connections}}]) as cursor:
for operation in cursor:
# 对sharding集群的特殊处理
if not 'client' in operation and operation.get('clientMetadata',{}).get('mongos',{}).get('client',{}):
operation['client'] = operation['clientMetadata']['mongos']['client']

# client_s 只是处理的mongos,并不是实际客户端
# client 在sharding获取不到?
if command_type in ['Full']:
processlists.append(operation)
elif command_type in ['All','Active']:
if 'clientMetadata' in operation:
processlists.append(operation)
elif command_type in ['Inner']:
if not 'clientMetadata' in operation:
processlists.append(operation)
result = {'status': 0, 'msg': 'ok', 'rows': processlists}
except Exception as e:
logger.warning(f"mongodb获取连接信息错误,错误信息{traceback.format_exc()}")
result = {'status': 1, 'msg': str(e)}

else:
result = {'status': 1, 'msg': '暂时不支持%s类型数据库的进程列表查询' % instance.db_type , 'data': []}
return HttpResponse(json.dumps(result), content_type='application/json')

if query_result:
if not query_result.error:
processlist = query_result.to_dict()
result = {'status': 0, 'msg': 'ok', 'rows': processlist}
else:
result = {'status': 1, 'msg': query_result.error}

# 返回查询结果
return HttpResponse(json.dumps(result, cls=ExtendJSONEncoder, bigint_as_string=True),
# ExtendJSONEncoderBytes 使用json模块,bigint_as_string只支持simplejson
return HttpResponse(json.dumps(result, cls=ExtendJSONEncoderBytes),
content_type='application/json')


# 问题诊断--通过进程id构建请求
# 问题诊断--通过线程id构建请求 这里只是用于确定将要kill的线程id还在运行
@permission_required('sql.process_kill', raise_exception=True)
def create_kill_session(request):
instance_name = request.POST.get('instance_name')
thread_ids = request.POST.get('ThreadIDs')

try:
instance = user_instances(request.user, db_type=['mysql']).get(instance_name=instance_name)
instance = user_instances(request.user).get(instance_name=instance_name)
except Instance.DoesNotExist:
result = {'status': 1, 'msg': '你所在组未关联该实例', 'data': []}
return HttpResponse(json.dumps(result), content_type='application/json')

result = {'status': 0, 'msg': 'ok', 'data': []}
# 判断是RDS还是其他实例
if AliyunRdsConfig.objects.filter(instance=instance, is_enable=True).exists():
result = aliyun_create_kill_session(request)
query_engine = get_engine(instance=instance)
if instance.db_type == 'mysql':
# 判断是RDS还是其他实例
if AliyunRdsConfig.objects.filter(instance=instance, is_enable=True).exists():
result = aliyun_create_kill_session(request)
else:
thread_ids = json.loads(thread_ids)

sql = "select concat('kill ', id, ';') from information_schema.processlist where id in ({});"\
.format(','.join(str(tid) for tid in thread_ids))
all_kill_sql = query_engine.query('information_schema', sql)
kill_sql = ''
for row in all_kill_sql.rows:
kill_sql = kill_sql + row[0]
result['data'] = kill_sql

elif instance.db_type == 'mongo':
conn = query_engine.get_connection()
active_opid = []
_opids = json.loads(thread_ids)
with conn.admin.aggregate([{"$currentOp": {"allUsers":True,"idleConnections": False}}]) as cursor:
for operation in cursor:
if 'opid' in operation and operation['opid'] in _opids:
active_opid.append(operation['opid'])

kill_command = ''
for opid in active_opid:
kill_command = kill_command + "db.killOp({});".format(opid)

result['data'] = kill_command

else:
thread_ids = json.loads(thread_ids)
query_engine = get_engine(instance=instance)
sql = "select concat('kill ', id, ';') from information_schema.processlist where id in ({});"\
.format(','.join(str(tid) for tid in thread_ids))
all_kill_sql = query_engine.query('information_schema', sql)
kill_sql = ''
for row in all_kill_sql.rows:
kill_sql = kill_sql + row[0]
result['data'] = kill_sql
result = {'status': 1, 'msg': '暂时不支持%s类型数据库通过进程id构建请求' % instance.db_type , 'data': []}
return HttpResponse(json.dumps(result), content_type='application/json')
# 返回查询结果
return HttpResponse(json.dumps(result, cls=ExtendJSONEncoder, bigint_as_string=True),
content_type='application/json')


# 问题诊断--终止会话
# 问题诊断--终止会话 这里是实际执行kill的操作
@permission_required('sql.process_kill', raise_exception=True)
def kill_session(request):
instance_name = request.POST.get('instance_name')
thread_ids = request.POST.get('ThreadIDs')
result = {'status': 0, 'msg': 'ok', 'data': []}

try:
instance = user_instances(request.user, db_type=['mysql']).get(instance_name=instance_name)
instance = user_instances(request.user).get(instance_name=instance_name)
except Instance.DoesNotExist:
result = {'status': 1, 'msg': '你所在组未关联该实例', 'data': []}
return HttpResponse(json.dumps(result), content_type='application/json')

# 判断是RDS还是其他实例
if AliyunRdsConfig.objects.filter(instance=instance, is_enable=True).exists():
result = aliyun_kill_session(request)
engine = get_engine(instance=instance)
if instance.db_type == 'mysql':
# 判断是RDS还是其他实例
if AliyunRdsConfig.objects.filter(instance=instance, is_enable=True).exists():
result = aliyun_kill_session(request)
else:
thread_ids = json.loads(thread_ids)

sql = "select concat('kill ', id, ';') from information_schema.processlist where id in ({});"\
.format(','.join(str(tid) for tid in thread_ids))
all_kill_sql = engine.query('information_schema', sql)
kill_sql = ''
for row in all_kill_sql.rows:
kill_sql = kill_sql + row[0]
engine.execute('information_schema', kill_sql)

elif instance.db_type == 'mongo':
conn = engine.get_connection()
db = conn.admin

for opid in json.loads(thread_ids):
conn.admin.command({ "killOp": 1, "op": opid})

else:
thread_ids = json.loads(thread_ids)
engine = get_engine(instance=instance)
sql = "select concat('kill ', id, ';') from information_schema.processlist where id in ({});"\
.format(','.join(str(tid) for tid in thread_ids))
all_kill_sql = engine.query('information_schema', sql)
kill_sql = ''
for row in all_kill_sql.rows:
kill_sql = kill_sql + row[0]
engine.execute('information_schema', kill_sql)
result = {'status': 1, 'msg': '暂时不支持%s类型数据库终止会话' % instance.db_type , 'data': []}
return HttpResponse(json.dumps(result), content_type='application/json')

# 返回查询结果
return HttpResponse(json.dumps(result, cls=ExtendJSONEncoder, bigint_as_string=True),
Expand All @@ -121,11 +208,15 @@ def tablesapce(request):
instance_name = request.POST.get('instance_name')

try:
instance = user_instances(request.user, db_type=['mysql']).get(instance_name=instance_name)
instance = user_instances(request.user).get(instance_name=instance_name)
except Instance.DoesNotExist:
result = {'status': 1, 'msg': '你所在组未关联该实例', 'data': []}
return HttpResponse(json.dumps(result), content_type='application/json')

if instance.db_type != 'mysql':
result = {'status': 1, 'msg': '暂时不支持%s类型数据库的表空间信息查询' % instance.db_type , 'data': []}
return HttpResponse(json.dumps(result), content_type='application/json')

# 判断是RDS还是其他实例
if AliyunRdsConfig.objects.filter(instance=instance, is_enable=True).exists():
result = aliyun_sapce_status(request)
Expand Down Expand Up @@ -164,11 +255,15 @@ def trxandlocks(request):
instance_name = request.POST.get('instance_name')

try:
instance = user_instances(request.user, db_type=['mysql']).get(instance_name=instance_name)
instance = user_instances(request.user).get(instance_name=instance_name)
except Instance.DoesNotExist:
result = {'status': 1, 'msg': '你所在组未关联该实例', 'data': []}
return HttpResponse(json.dumps(result), content_type='application/json')

if instance.db_type != 'mysql':
result = {'status': 1, 'msg': '暂时不支持%s类型数据库的锁等待查询' % instance.db_type , 'data': []}
return HttpResponse(json.dumps(result), content_type='application/json')

query_engine = get_engine(instance=instance)
server_version = query_engine.server_version
if server_version < (8, 0, 1):
Expand Down Expand Up @@ -247,11 +342,15 @@ def innodb_trx(request):
instance_name = request.POST.get('instance_name')

try:
instance = user_instances(request.user, db_type=['mysql']).get(instance_name=instance_name)
instance = user_instances(request.user).get(instance_name=instance_name)
except Instance.DoesNotExist:
result = {'status': 1, 'msg': '你所在组未关联该实例', 'data': []}
return HttpResponse(json.dumps(result), content_type='application/json')

if instance.db_type != 'mysql':
result = {'status': 1, 'msg': '暂时不支持%s类型数据库的长事务查询' % instance.db_type , 'data': []}
return HttpResponse(json.dumps(result), content_type='application/json')

query_engine = get_engine(instance=instance)
sql = '''select trx.trx_started,
trx.trx_state,
Expand Down
Loading

0 comments on commit 8b7bbf9

Please sign in to comment.