-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathscanner.py
226 lines (190 loc) · 7.33 KB
/
scanner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import sys
import socket
import argparse
import threading
import queue
import os
from datetime import datetime
from packaging.version import parse as parse_version, InvalidVersion
import coloredlogs
import logging
from colorama import init, Fore, Style
import pyfiglet
import signal
# Initialize Colorama
init()
# ANSI color codes
COLORS = {
'light_gray': Fore.WHITE + Style.BRIGHT,
'dimmed_gray': Fore.LIGHTBLACK_EX,
'honey_yellow': Fore.LIGHTYELLOW_EX,
'dim_yellow': Fore.YELLOW + Style.BRIGHT,
'cyan': Fore.CYAN,
'green': Fore.GREEN,
'dimmed_green': Fore.GREEN + Style.DIM,
'red': Fore.RED,
'light_orange': Fore.LIGHTYELLOW_EX,
'white': Fore.WHITE,
'reset': Style.RESET_ALL
}
LOG_DIR = 'logs'
LOG_FILE = os.path.join(LOG_DIR, 'scan.log')
# Set up coloredlogs
coloredlogs.install(level='DEBUG')
# Create log directory if not exists
def create_log_dir():
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
logging.info(f"Log directory created: {LOG_DIR}")
# Log message to file and console
def log_message(level, message):
log_entry = f"{message}"
with open(LOG_FILE, 'a') as log_file:
log_file.write(log_entry + "\n")
color = COLORS.get(level, COLORS['reset'])
logging.info(log_entry)
# Parse SSH version string
def parse_ssh_version(version):
try:
return parse_version(version.replace("p", "."))
except InvalidVersion:
return None
# Check if SSH version is vulnerable
def is_vulnerable(version):
parsed_version = parse_ssh_version(version)
if not parsed_version:
return False, None
# CVE vulnerabilities list with corresponding versions
vulnerabilities = [
(parse_version("2.3.0"), "CVE-2001-0817"),
(parse_version("3.1"), "CVE-2002-0083"),
(parse_version("3.7.1.2"), "CVE-2003-0190"),
(parse_version("4.4"), "CVE-2006-5051, CVE-2008-4109"),
(parse_version("5.0.2"), "CVE-2008-5161"),
(parse_version("7.9.1"), "CVE-2019-6111"),
(parse_version("7.1.1"), "CVE-2016-0777"),
(parse_version("6.0.2"), "CVE-2012-0816"),
(parse_version("6.1.2"), "CVE-2012-0814"),
(parse_version("6.2.3"), "CVE-2013-4548"),
(parse_version("6.6.2"), "CVE-2014-2532"),
(parse_version("6.9.1"), "CVE-2015-5600"),
(parse_version("6.8.2"), "CVE-2015-6563"),
(parse_version("7.0.2"), "CVE-2015-6564"),
(parse_version("7.1.3"), "CVE-2016-3115"),
(parse_version("7.2.3"), "CVE-2016-6210"),
(parse_version("7.2.1"), "CVE-2016-10009"),
(parse_version("6.9.2"), "CVE-2016-10012"),
(parse_version("7.7.1"), "CVE-2018-15473"),
(parse_version("9.8"), "CVE-2024-6387")
]
for v, cve in vulnerabilities:
if parsed_version < v:
return True, cve
return False, None
# Retrieve SSH version from target IP and port
def get_ssh_version(ip, port):
try:
with socket.create_connection((ip, port), timeout=5) as sock:
sock.sendall(b'\x00')
response = sock.recv(1024).decode().strip()
if response.startswith("SSH-2.0-OpenSSH"):
version_info = response.split('-')[2]
return version_info if "OpenSSH_" in version_info else "No Version"
except (socket.error, socket.timeout) as e:
log_message('error', f"Error connecting to {ip}:{port}: {e}")
except Exception as e:
log_message('error', f"Unexpected error retrieving SSH version from {ip}:{port}: {e}")
return None
# Test SSH version vulnerability for a target host
def test_host(target):
if "://" in target:
target = target.split("://")[1].rstrip('/')
ip, port = (target.split(":") + [22])[:2]
port = int(port)
version = get_ssh_version(ip, port)
if version:
version_str = version.split('_')[1] if "OpenSSH_" in version else "unknown"
is_vuln, cve_number = is_vulnerable(version_str)
return ip, port, version, is_vuln, cve_number
return ip, port, "no version", False, None
# Worker function to process targets in a thread
def worker(queue, lock, output_file, stop_event):
while not queue.empty() and not stop_event.is_set():
try:
target = queue.get(timeout=1)
result = test_host(target)
ip, port, version, is_vuln, cve_number = result
with lock:
if is_vuln:
color = COLORS['green']
message = f"{ip}:{port} - {color}{version} - Vulnerable - {cve_number}{COLORS['reset']}"
else:
if version == "no version":
color = COLORS['dim_yellow']
else:
color = COLORS['red']
message = f"{ip}:{port} - {color}{version} - Not Vulnerable{COLORS['reset']}"
log_message('info', message)
if output_file and is_vuln:
with open(output_file, "a") as f:
f.write(f"{ip}:{port} - {version} - {cve_number}\n")
queue.task_done()
except queue.Empty:
pass
except Exception as e:
log_message('error', f"Exception in worker thread: {e}")
log_message('info', f"Worker thread stopped.")
# Main function to start scanning
def main(args):
create_log_dir()
targets = []
# Read targets from file
if os.path.isfile(args.file):
with open(args.file, "r") as f:
targets = [line.strip() for line in f if line.strip()]
else:
log_message('error', f"File not found: {args.file}")
sys.exit(1)
# Setup target queue and thread lock
target_queue = queue.Queue()
lock = threading.Lock()
for target in targets:
target_queue.put(target)
# Event to stop threads on KeyboardInterrupt
stop_event = threading.Event()
def signal_handler(sig, frame):
log_message('info', "Ctrl+C detected. Exiting...")
stop_event.set()
# Register SIGINT handler
signal.signal(signal.SIGINT, signal_handler)
threads = []
# Start worker threads
for _ in range(args.threads):
t = threading.Thread(target=worker, args=(target_queue, lock, args.output, stop_event))
t.start()
threads.append(t)
try:
target_queue.join()
except KeyboardInterrupt:
log_message('info', "Ctrl+C detected. Waiting for threads to finish...")
# Wait for threads to complete
for t in threads:
t.join()
log_message('info', f"Scan completed.")
if __name__ == "__main__":
# Print PyFiglet banner
def banner():
print()
banner_text = pyfiglet.figlet_format("OpenSSH CVE Scanner")
log_message('info', f"{COLORS['cyan']}{banner_text}{COLORS['reset']}")
parser = argparse.ArgumentParser(description='Bulk Scan Tool for OpenSSH CVEs')
parser.add_argument('-f', '--file', required=True, help='Input file with targets (ip:port)')
parser.add_argument('-o', '--output', help='Output file to save vulnerable IPs')
parser.add_argument('-t', '--threads', type=int, default=3, help='Number of threads (default: 3)')
args = parser.parse_args()
# Initialize logging
logging.basicConfig(filename=LOG_FILE, level=logging.INFO, format='%(asctime)s ap1d root[17028] INFO %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
# Print banner
banner()
# Start main scanning process
main(args)