-
Notifications
You must be signed in to change notification settings - Fork 0
/
zennet.py
executable file
·256 lines (205 loc) · 8.12 KB
/
zennet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
#! venv/bin/python3
"""Blazing fast tool for network traffic analysis and working with network protocols
Copyright (C) 2024 Alexeev Bronislav
License: BSD 3-Clause
Link: https://github.com/alexeev-engineer/zennet.
Contacts: bro.alexeev@inbox.ru, t.me/alexeev_dev
"""
import argparse
from functools import cache
from time import monotonic
from rich import print
from rich.tree import Tree
from paintlog.logger import pydbg_obj, Logger
from tqdm.asyncio import tqdm
import asyncio
# Модули
from znet.request.get import send_request_get
from znet.scanners.port_scanner import scan_port
from znet.osint.whois import *
from znet.netlib import url_info, check_network, port_info
logger = Logger('zennet.log')
__version__ = '0.3.3'
@cache
def write_to_file(filename: str, data: str) -> None:
"""Write output to file.
Arguments:
---------
+ filename: str - filename
+ data: str - output
"""
pydbg_obj(filename)
with open(filename, 'w') as file:
file.write(data)
@cache
async def send_get(url: str, timeout: int, output: str) -> None:
"""Create task for sending get request.
Arguments:
---------
+ url: str - url address
+ timeout: int - timeout for connecting
+ output: str - filename for output data
"""
pydbg_obj(url)
logger.log(f'Send HTTP-get request on {url}', 'info')
task = asyncio.create_task(send_request_get(url, timeout))
data = await asyncio.gather(task)
data_tree = Tree(f"[yellow]{data[0][0]}")
data_tree.add(f"[cyan]Response code:{data[0][1]}")
data_tree.add(f"[cyan]Headers: {data[0][2]}")
data_tree.add(f'[italic]Log saved in {output}')
data = f'''Report on sending a GET request to a {url}
# Request: {data[0][0]})
Response code: `{data[0][1]}`\n
Headers: `{data[0][2]}`\n
Content:
```html
{data[0][3]}```
Response text:
```html
{data[0][4]}
```
Report generated by [zennet ⚡️](https://github.com/alexeev-engineer/zennet)
'''
write_to_file(output, data)
print(data_tree)
@cache
async def start_port_scanner(url: str, ports: str, max_ports: int, output: str) -> None:
"""Start port scanner.
Arguments:
---------
+ url: str - url address
+ ports: str - list of ports for scanning
+ max_ports: int - count of max ports for scanning)
+ output: str - filename for output data
"""
logger.log(f'Start scanning {url}', 'info')
text = f'# Result of scanning URL {url}:\n'
tasks = []
available_ports = []
# Если порты были указаны напрямую, а не просто максимальное число для сканироваия портов
if ports:
# Разделяем список портов
async for port in tqdm(ports.split(' '), desc='Create tasks', ascii=False,
unit='task', smoothing=0.5, colour='blue',
bar_format='{desc}: {percentage:3.0f}%| {bar} | {n_fmt}/{total_fmt} {rate_fmt}{postfix}'):
# Добавляем в список задач цель сканировать порт
tasks.append(asyncio.create_task(scan_port(url, int(port))))
else:
# Если указано максимальное число для сканирования портов
async for port in tqdm(range(1, max_ports + 1), desc='Creating tasks', ascii=False,
unit='task', smoothing=0.5, colour='blue',
bar_format='{desc}: {percentage:3.0f}%| {bar} | {n_fmt}/{total_fmt} {rate_fmt}{postfix}'):
tasks.append(asyncio.create_task(scan_port(url, port)))
async for task in tqdm(tasks, desc='Launch task', ascii=False,
unit='task', smoothing=0.5, colour='blue',
bar_format='{desc}: {percentage:3.0f}%| {bar} | {n_fmt}/{total_fmt} {rate_fmt}{postfix}'):
data = await asyncio.gather(task)
if data[0].split(' ')[-1] == 'opened':
available_ports.append(data[0])
text += f'{data[0]}\n'
if len(available_ports) > 0:
print(f'[bold green]{" ".join(available_ports)}[/bold green]')
print(f'List of opened ports: {" ".join(available_ports)}' if len(available_ports) > 0 else "No ports open")
text += '\nReport generated by [zennet ⚡️](https://github.com/alexeev-engineer/zennet)'
logger.log(f'Open ports of {url}: {available_ports}', 'info')
write_to_file(output, text)
async def osint_ip(ip: str, output: str) -> None:
logger.log(f'OSINT IP {ip}', 'info')
task1 = asyncio.create_task(whois_info(ip))
task2 = asyncio.create_task(ipwhois_info(ip))
task3 = asyncio.create_task(url_info(ip))
info1 = await asyncio.gather(task1)
info2 = await asyncio.gather(task2)
info3 = await asyncio.gather(task3)
text = f'# IP address analysis report {ip}\n'
data_tree = Tree(f"[green]{ip}")
for i in range(len(info1[0])):
name = info1[0][i].split(': ')[0]
data = info1[0][i].split(': ')[-1]
data_tree.add(f'[cyan]{name}[/cyan]: {data}')
text += f'{info1[0][i]}\n'
data_tree.add('[bold]IPWhois info:')
for name, data in info2[0].items():
data_tree.add(f'[cyan]{name}[/cyan]: {data}')
text += f'{name}: {data}\n'
for el in info3[0]:
name = el.split(': ')[0]
data = el.split(': ')[-1]
data_tree.add(f'[cyan]{name}[/cyan]: {data}')
text += f'{name}: {data}\n'
data_tree.add(f'[italic]Log saved in {output}')
text += '\nReport generated by [zennet ⚡️](https://github.com/alexeev-engineer/zennet)'
write_to_file(output, text)
print(data_tree)
@cache
async def main() -> None:
"""Main function."""
program_started = monotonic()
print(f'⚡️ zennet v {__version__} @ alexeev-engineer\n')
parser = argparse.ArgumentParser(description='Blazing fast tool for analysis network traffic')
parser.add_argument('-u', "--url", type=str, help='URL address')
parser.add_argument('-o', '--output', type=str, default='zennet.md', help='Filename for output')
parser.add_argument('-g', '--get', action='store_true', help='Send HTTP-get request')
parser.add_argument('-sp', '--scan-ports', action='store_true', help='Port scannner')
parser.add_argument('-p', '--ports', type=str, help='List of ports for scanning (overwrite max ports)')
parser.add_argument('-mp', '--max-ports', type=int, default=2**16, help='Max ports count for scanning')
parser.add_argument('-w', '--whois', action='store_true', help='WhoIs IP (needed --url (but IP))')
parser.add_argument('-l', '--localinfo', action='store_true', help='Info about local network')
parser.add_argument('-pi', '--portinfo', type=int, help='Info about server port')
args = parser.parse_args()
if args.get:
if args.url:
start = monotonic()
task = asyncio.create_task(send_get(args.url, 3, args.output))
await task
end = monotonic()
total = end - start
print(f'Request execution time: {round(total, 4)} seconds')
else:
print('URL not set')
elif args.portinfo:
task = asyncio.create_task(port_info(args.portinfo))
info = await asyncio.gather(task)
print(info[0])
logger.log(f'Info about port {args.portinfo} ({info[0]})')
elif args.localinfo or (args.whois and (args.url == 'localhost' or args.url == '127.0.0.1')):
task = asyncio.create_task(check_network())
info = await asyncio.gather(task)
output_data = '# Analysis local network report\n'
data_tree = Tree('[green]Information about local network')
for el in info[0]:
name = el.split(': ')[0]
data = el.split(': ')[-1]
data_tree.add(f'[cyan]{name}[/cyan]: {data}')
output_data += f'{name}: {data}\n'
print(data_tree)
output_data += '\nReport generated by [zennet ⚡️](https://github.com/alexeev-engineer/zennet)'
write_to_file(args.output, output_data)
elif args.whois:
if args.url:
start = monotonic()
task = asyncio.create_task(osint_ip(args.url, args.output))
await task
end = monotonic()
total = end - start
print(f'Whois execution time: {round(total, 4)} seconds')
else:
print('URL not set')
elif args.scan_ports:
if args.url:
start = monotonic()
task = asyncio.create_task(start_port_scanner(args.url, args.ports, args.max_ports, args.output))
await task
end = monotonic()
total = end - start
print(f'Port scanner operating time: {round(total, 4)} seconds')
else:
print('URL not set')
else:
print('[red]View help with `--help` flag[/red]')
program_ended = monotonic()
program_work_time = round(program_ended - program_started, 4)
print(f'\n⚡️ Software execution time: {program_work_time} seconds')
if __name__ == '__main__':
asyncio.run(main())