-
Notifications
You must be signed in to change notification settings - Fork 1
/
net-listeners-proc-custom.py
executable file
·299 lines (240 loc) · 9.79 KB
/
net-listeners-proc-custom.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
#!/usr/bin/env python3
""" Output a colorized list of listening addresses with owners.
This tool parses files in /proc directly to obtain the list
of IPv4 and IPv6 addresses listening on tcp, tcp6, udp, and udp6 ports
also with pids of processes responsible for the listening.
Instead of traversing every /proc/[pid]/fd/* file, this tool uses
a custom proc file of /proc/pid_inode_map telling us which PIDs have
which inodes. The kernel module implementing /proc/pid_inode_map
can be built in the 'pid_inode_map' directory.
Using the custom inode map also lets you run this script without
root yet still obtain the full listening socket to pid/command list.
(but, you obviously need to be root at least once to load the module)
"""
import collections
import subprocess
import codecs
import socket
import struct
import glob
import sys
import re
import os
TERMINAL_WIDTH = "/usr/bin/tput cols" # could also be "stty size"
ONLY_LOWEST_PID = False
# oooh, look, a big dirty global dict collecting all our data without being
# passed around! call the programming police!
inodes = {}
class Color:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
END = '\033[0m'
COLOR_HEADER = Color.HEADER
COLOR_OKAY = Color.OKBLUE
COLOR_WARNING = Color.FAIL
COLOR_END = Color.END
# This should capture:
# 127.0.0.0/8
# 192.168.0.0/16
# 10.0.0.0/8
# 169.254.0.0/16
# 172.16.0.0/12
# ::1
# fe80::/10
# fc00::/7
# fd00::/8
NON_ROUTABLE_REGEX = r"""^((127\.) |
(192\.168\.) |
(10\.) |
(169\.254\.) |
(172\.1[6-9]\.) |
(172\.2[0-9]\.) |
(172\.3[0-1]\.) |
(::1) |
([fF][eE]80)
([fF][cCdD]))"""
likelyLocalOnly = re.compile(NON_ROUTABLE_REGEX, re.VERBOSE)
def run(thing):
""" Run any string as an async command invocation. """
# We don't use subprocess.check_output because we want to run all
# processes async
return subprocess.Popen(thing.split(), stdout=subprocess.PIPE)
def readOutput(ranCommand):
""" Return array of rows split by newline from previous invocation. """
stdout, stderr = ranCommand.communicate()
return stdout.decode('utf-8').strip().splitlines()
def procListeners():
""" Wrapper to parse all IPv4 tcp udp, and, IPv6 tcp6 udp6 listeners. """
def processProc(name):
""" Process IPv4 and IPv6 versions of listeners based on ``name``.
``name`` is either 'udp' or 'tcp' so we parse, for each ``name``:
- /proc/net/[name]
- /proc/net/[name]6
As in:
- /proc/net/tcp
- /proc/net/tcp6
- /proc/net/udp
- /proc/net/udp6
"""
def ipv6(addr):
""" Convert /proc IPv6 hex address into standard IPv6 notation. """
# turn ASCII hex address into binary
addr = codecs.decode(addr, "hex")
# unpack into 4 32-bit integers in big endian / network byte order
addr = struct.unpack('!LLLL', addr)
# re-pack as 4 32-bit integers in system native byte order
addr = struct.pack('@IIII', *addr)
# now we can use standard network APIs to format the address
addr = socket.inet_ntop(socket.AF_INET6, addr)
return addr
def ipv4(addr):
""" Convert /proc IPv4 hex address into standard IPv4 notation. """
# Instead of codecs.decode(), we can just convert a 4 byte hex
# string to an integer directly using python radix conversion.
# Basically, int(addr, 16) EQUALS:
# aOrig = addr
# addr = codecs.decode(addr, "hex")
# addr = struct.unpack(">L", addr)
# assert(addr == (int(aOrig, 16),))
addr = int(addr, 16)
# system native byte order, 4-byte integer
addr = struct.pack("=L", addr)
addr = socket.inet_ntop(socket.AF_INET, addr)
return addr
isUDP = name == "udp"
# Iterate four files: /proc/net/{tcp,udp}{,6}
# ipv4 has no prefix, while ipv6 has 6 appended.
for ver in ["", "6"]:
with open(f"/proc/net/{name}{ver}", 'r') as proto:
proto = proto.read().splitlines()
proto = proto[1:] # drop header row
for cxn in proto:
cxn = cxn.split()
# /proc/net/udp{,6} uses different constants for LISTENING
if isUDP:
# These constants are based on enum offsets inside
# the Linux kernel itself. They aren't likely to ever
# change since they are hardcoded in utilities.
isListening = cxn[3] == "07"
else:
isListening = cxn[3] == "0A"
# Right now this is a single-purpose tool so if inode is
# not listening, we avoid further processing of this row.
if not isListening:
continue
ip, port = cxn[1].split(':')
if ver:
ip = ipv6(ip)
else:
ip = ipv4(ip)
port = int(port, 16)
inode = cxn[9]
# We just use a list here because creating a new sub-dict
# for each entry was noticably slower than just indexing
# into lists.
inodes[int(inode)] = [ip, port, f"{name}{ver}"]
processProc("tcp")
processProc("udp")
def addProcessNamesToInodes():
""" Read /proc/pid_inode_map to populate inodePidMap """
inodePidMap = collections.defaultdict(list)
with open("/proc/pid_inode_map", 'r') as pim:
for line in pim:
parts = line.split()
pid = parts[0]
name = parts[1]
pimInodes = set(parts[2:])
for inode in pimInodes:
inodePidMap[int(inode)].append(int(pid))
for inode in inodes:
if inode in inodePidMap:
for pid in inodePidMap[inode]:
try:
with open(f"/proc/{pid}/cmdline", 'r') as cmd:
# /proc command line arguments are delimited by
# null bytes, so undo that here...
cmdline = cmd.read().split('\0')
inodes[inode].append((pid, cmdline))
except BaseException:
# files can vanish on us at any time (and that's okay!)
# But, since the file is gone, we want the entire inodes
# entry gone too:
pass # del inodes[inode]
def checkListenersCustomProc():
terminalWidth = run(TERMINAL_WIDTH)
procListeners()
addProcessNamesToInodes()
tried = inodes
try:
cols = readOutput(terminalWidth)[0]
cols = int(cols)
except BaseException:
cols = 80
# Print our own custom output header...
proto = "Proto"
addr = "Listening"
pid = "PID"
process = "Process"
print(f"{COLOR_HEADER}{proto:^5} {addr:^25} {pid:>5} {process:^30}")
# Could sort by anything: ip, port, proto, pid, command name
# (or even the inode integer if that provided any insight whatsoever)
def compareByPidOrPort(what):
k, v = what
# v = [ip, port, proto, pid, cmd]
# - OR -
# v = [ip, port, proto]
# If we're not running as root we can't pid and command mappings for
# the processes of other users, so sort the pids we did find at end
# of list and show UNKNOWN entries first
# (because the lines will be shorter most likely so the bigger visual
# weight should be lower in the display table)
try:
# Pid available! Sort by first pid, subsort by IP then port.
return (1, v[3], v[0], v[1])
except BaseException:
# No pid available! Sort by port number then IP then... port again.
return (0, v[1], v[0], v[1])
# Sort results by pid...
for name, vals in sorted(tried.items(), key=compareByPidOrPort):
desc = [f"{pid:5} {' '.join(cmd)}" for pid, cmd in vals[3:]]
port = vals[1]
try:
# Convert port integer to service name if possible
port = socket.getservbyport(port)
except BaseException:
# If no match, just use port number directly.
pass
addr = f"{vals[0]}:{port}"
proto = vals[2]
# If IP address looks like it could be visible to the world,
# throw up a color.
# Note: due to port forwarding and NAT and other issues,
# this clearly isn't exhaustive.
if re.match(likelyLocalOnly, addr):
colorNotice = COLOR_OKAY
else:
colorNotice = COLOR_WARNING
isFirstLine = True
for line in desc:
if isFirstLine:
output = f"{colorNotice}{proto:5} {addr:25} {line}"
isFirstLine = False
else:
output = f"{' ':31} {line}"
# Be a polite terminal citizen by limiting our width to user's width
# (colors take up non-visible space, so add it to our col count)
print(output[:cols + (len(colorNotice) if isFirstLine else 0)])
if ONLY_LOWEST_PID:
break
print(COLOR_END)
if __name__ == "__main__":
# cheap hack garbage way of setting one option
# if we need more options, obviously pull in argparse
if len(sys.argv) > 1:
ONLY_LOWEST_PID = True
checkListenersCustomProc()