-
Notifications
You must be signed in to change notification settings - Fork 0
/
tests.py
260 lines (229 loc) · 5.94 KB
/
tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
import logging
from .log import logger
logger.setLevel(logging.DEBUG)
def test_parse_string():
from .parser import ConditionParser
string = """if
条件1
then
动作1
elseif
条件2
then
动作2
else
动作3
end"""
condition = ConditionParser(string)
assert (condition.parsed_string_list) == [['条件1', '动作1'], ['条件2', '动作2'], [True, '动作3']]
string = """if
@fac.sql_type == select
then
@act.reject_execute
else
@act.allow_execute
end
"""
condition = ConditionParser(string)
print(condition.parsed_string_list)
assert condition.parsed_string_list == [['@fac.sql_type == select', '@act.reject_execute'], [True, '@act.allow_execute']]
string = """
if
@fun.char_length > 1000
then
@act.reject_execute
end
"""
condition = ConditionParser(string)
assert condition.parsed_string_list == [['@fun.char_length > 1000', '@act.reject_execute']]
def test_flow_and_in():
from dsl.runner.mysql import MysqlDSL
string = """
if
"select" == @fac.sql_type
then
@act.reject_execute '只能执行查询语句'
elseif
@fun.char_length > 1 and @fac.sql_type in ["select","insert"]
then
@act.allow_execute
end
"""
text = 'select * from table_name'
rule = MysqlDSL(string, text)
assert rule.match_tree() is False
assert rule.end_msg == (False, "'只能执行查询语句'")
def test_flow_and():
from dsl.runner.mysql import MysqlDSL
string = """
if
@fun.char_length > 1 and @fac.sql_type == "select"
then
@act.allow_execute
end
"""
text = 'select * from table_name2'
rule = MysqlDSL(string, text)
assert rule.match_tree() is True
def test_flow_and_more():
from dsl.runner.mysql import MysqlDSL
string = """
if
@fun.char_length > 1 and @fac.sql_type == "select" and @fac.sql_type in ["delete"]
then
@act.allow_execute
end
"""
text = 'select * from table_name3'
rule = MysqlDSL(string, text)
assert rule.match_tree() is None
def test_flow_and_in_notin():
from dsl.runner.mysql import MysqlDSL
string = """
if
@fun.char_length > 1 and @fac.sql_type == "select" and @fac.sql_type not in ["delete"]
then
@act.allow_execute
end
"""
text = 'select * from table_name4'
rule = MysqlDSL(string, text)
assert rule.match_tree() is True
def test_flow_only():
from dsl.runner.mysql import MysqlDSL
string = """
if
@fun.is_admin_user
then
@act.allow_execute
end
"""
text = 'select * from table_name5'
rule = MysqlDSL(string, text)
assert rule.match_tree() is None
def test_flow_mul_flow():
from dsl.runner.mysql import MysqlDSL
string = """
if
false and 100990 < @fac.char_length and @fun.sql_type in @fac.can_run_type
then
@act.allow_execute
elseif
@fun.sql_type != "select" and @fun.is_admin_user
then
@act.allow_execute
elseif
@fac.char_length >= 100
then
@act.allow_execute
elseif
@fun.is_char_lower("UPPER")
then
@act.allow_execute
elseif
false and @fun.is_char_lower(@fac.sql_type) or false
then
@act.allow_execute
else
@act.reject_execute
end
"""
text = 'select * from table_name5'
rule = MysqlDSL(string, text, log=logger)
assert rule.match_tree() is False
def test_flow_or():
from dsl.runner.mysql import MysqlDSL
string = """
if
@fun.char_length < 1 and @fac.sql_type == "select2" or "" or @fac.sql_type not in ["delete"]
then
@act.allow_execute
end
"""
text = 'select * from table_name4'
rule = MysqlDSL(string, text)
assert rule.match_tree() is True
def test_white():
from dsl.runner.mysql import MysqlDSL
string = """
if
true and 1
then
true
end
"""
md = MysqlDSL(string, 'select * from table_name', allow_no_params=True, log=logger)
assert md.match_tree()
def test_space():
from dsl.runner.mysql import MysqlDSL
string = """
if
@fac.char_length <= 100
then
true
end
"""
md = MysqlDSL(string, 'select * from table_name', allow_no_params=True, log=logger)
assert md.match_tree() is True
def test_rtype_value():
from dsl.runner.mysql import MysqlDSL
string = """
if
44 == @fac.char_length
then
@act.allow_execute
elseif
@fac.sql_type not in ["select"] and
then
@act.allow_execute allow select
elseif
@fac.test_dict == {"a":1,"b":2}
then
@act.allow_execute dict ok
else
@act.reject_execute
end
"""
md = MysqlDSL(string, 'select * from table_name', allow_no_params=True, log=logger)
md.match_tree()
assert md.end_msg == (True, 'dict ok')
def test_trackets():
from dsl.runner.mysql import MysqlDSL
# string = """
# if
# (@fac.sql_type not in ["select"] or @fac.char_length > 1) or @fun.is_char_lower(@fac.sql_type) or (1 and 2)
# then
# @act.allow_execute
# end
# """
string = """
if
(1 and False) and 4 or (False or 5) and 4 > 20
then
@act.allow_execute
elseif
(1 or False) and True and ()
then
@act.allow_execute 1
else
@act.reject_execute reject
end
"""
md = MysqlDSL(string, 'select * from table_name', allow_no_params=True, log=logger)
md.match_tree()
assert md.end_msg == (False, 'reject')
def test_regex():
from dsl.runner.mysql import MysqlDSL
string = """
if
"idx_aa" matchs "idx_\\w+"
then
@act.allow_execute success
else
@act.reject_execute failed
end
"""
md = MysqlDSL(string, 'select * from table_name')
md.match_tree()
assert md.end_msg == (True, 'success')
# pytest dsl/tests.py -o log_cli=true