-
Notifications
You must be signed in to change notification settings - Fork 1
/
net-listeners-unified.py
executable file
·358 lines (285 loc) · 11.8 KB
/
net-listeners-unified.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
#!/usr/bin/env python3
""" Output a colorized list of listening addresses with owners.
This is the unification with duplication removed of:
- net-listeners-proc.py
- net-listeners-proc-custom.py
During development it was useful to have multiple files to test various
approaches at the same time, but each design converged on a common set
of data structures and a common set of approaches to final reporting.
"""
import collections
import subprocess
import codecs
import socket
import struct
import glob
import sys
import re
import os
TERMINAL_WIDTH = "/usr/bin/tput cols" # could also be "stty size"
# oooh, look, a big dirty global dict collecting all our data without being
# passed around! call the programming police!
inodes = {}
class Color:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
END = '\033[0m'
COLOR_HEADER = Color.HEADER
COLOR_OKAY = Color.OKBLUE
COLOR_WARNING = Color.FAIL
COLOR_END = Color.END
# This should capture:
# 127.0.0.0/8
# 192.168.0.0/16
# 10.0.0.0/8
# 169.254.0.0/16
# 172.16.0.0/12
# ::1
# fe80::/10
# fc00::/7
# fd00::/8
NON_ROUTABLE_REGEX = r"""^((127\.) |
(192\.168\.) |
(10\.) |
(169\.254\.) |
(172\.1[6-9]\.) |
(172\.2[0-9]\.) |
(172\.3[0-1]\.) |
(::1) |
([fF][eE]80)
([fF][cCdD]))"""
likelyLocalOnly = re.compile(NON_ROUTABLE_REGEX, re.VERBOSE)
def run(thing):
""" Run any string as an async command invocation. """
# We don't use subprocess.check_output because we want to run all
# processes async
return subprocess.Popen(thing.split(), stdout=subprocess.PIPE)
def readOutput(ranCommand):
""" Return array of rows split by newline from previous invocation. """
stdout, stderr = ranCommand.communicate()
return stdout.decode('utf-8').strip().splitlines()
def iterateProcNetworkingGatherListeningInodes():
""" Wrapper to parse all IPv4 tcp udp, and, IPv6 tcp6 udp6 listeners. """
def processProc(name):
""" Process IPv4 and IPv6 versions of listeners based on ``name``.
``name`` is either 'udp' or 'tcp' so we parse, for each ``name``:
- /proc/net/[name]
- /proc/net/[name]6
As in:
- /proc/net/tcp
- /proc/net/tcp6
- /proc/net/udp
- /proc/net/udp6
"""
def ipv6(addr):
""" Convert /proc IPv6 hex address into standard IPv6 notation. """
# turn ASCII hex address into binary
addr = codecs.decode(addr, "hex")
# unpack into 4 32-bit integers in big endian / network byte order
addr = struct.unpack('!LLLL', addr)
# re-pack as 4 32-bit integers in system native byte order
addr = struct.pack('@IIII', *addr)
# now we can use standard network APIs to format the address
addr = socket.inet_ntop(socket.AF_INET6, addr)
return addr
def ipv4(addr):
""" Convert /proc IPv4 hex address into standard IPv4 notation. """
# Instead of codecs.decode(), we can just convert a 4 byte hex
# string to an integer directly using python radix conversion.
# Basically, int(addr, 16) EQUALS:
# aOrig = addr
# addr = codecs.decode(addr, "hex")
# addr = struct.unpack(">L", addr)
# assert(addr == (int(aOrig, 16),))
addr = int(addr, 16)
# system native byte order, 4-byte integer
addr = struct.pack("=L", addr)
addr = socket.inet_ntop(socket.AF_INET, addr)
return addr
isUDP = name == "udp"
# Iterate four files: /proc/net/{tcp,udp}{,6}
# ipv4 has no prefix, while ipv6 has 6 appended.
for ver in ["", "6"]:
with open(f"/proc/net/{name}{ver}", 'r') as proto:
proto = proto.read().splitlines()
proto = proto[1:] # drop header row
for cxn in proto:
cxn = cxn.split()
# /proc/net/udp{,6} uses different constants for LISTENING
if isUDP:
# These constants are based on enum offsets inside
# the Linux kernel itself. They aren't likely to ever
# change since they are hardcoded in utilities.
isListening = cxn[3] == "07"
else:
isListening = cxn[3] == "0A"
# Right now this is a single-purpose tool so if inode is
# not listening, we avoid further processing of this row.
if not isListening:
continue
ip, port = cxn[1].split(':')
if ver:
ip = ipv6(ip)
else:
ip = ipv4(ip)
port = int(port, 16)
inode = cxn[9]
# We just use a list here because creating a new sub-dict
# for each entry was noticably slower than list indexing.
inodes[int(inode)] = [ip, port, f"{name}{ver}"]
processProc("tcp")
processProc("udp")
def generateInodePidMapFromProcGlob():
""" Loop over every fd in every process in /proc.
The only way to map an fd back to a process is by looking
at *every* processes fd and extracting backing inodes.
It's basically like a big awkward database join where you don't
have an index on the field you want.
Also, due to Linux permissions (and Linux security concerns),
only the root user can read fd listing of processes not owned
by the current user. """
def appendToInodePidMap(fd, inodePidMap):
""" Take a full path to /proc/[pid]/fd/[fd] for reading.
Populates both pid and full command line of pid owning an inode we
are interested in.
Basically finds if any inodes on this pid is a listener we previously
recorded into our ``inodes`` dict. """
_, _, pid, _, _ = fd.split('/')
try:
target = os.readlink(fd)
except FileNotFoundError:
# file vanished, can't do anything else
return
if target.startswith("socket"):
ostype, inode = target.split(':')
# strip brackets from fd string (it looks like: [fd])
inode = int(inode[1:-1])
inodePidMap[inode].append(int(pid))
# glob glob glob it all
allFDs = glob.iglob("/proc/*/fd/*")
inodePidMap = collections.defaultdict(list)
for fd in allFDs:
appendToInodePidMap(fd, inodePidMap)
return inodePidMap
def generateInodePidMapFromProcCustom():
""" Read /proc/pid_inode_map to populate inodePidMap """
inodePidMap = collections.defaultdict(list)
with open("/proc/pid_inode_map", 'r') as pim:
for line in pim:
parts = line.split()
pid = parts[0]
name = parts[1] # unused, we lookup the full cmdline later
pimInodes = set(parts[2:])
for inode in pimInodes:
inodePidMap[int(inode)].append(int(pid))
return inodePidMap
def addProcessNamesToInodes(inodePidMap):
for inode in inodes:
if inode in inodePidMap:
for pid in inodePidMap[inode]:
try:
with open(f"/proc/{pid}/cmdline", 'r') as cmd:
# /proc command line arguments are delimited by
# null bytes, so undo that here...
cmdline = cmd.read().split('\0')
inodes[inode].append((pid, cmdline))
except BaseException:
# files can vanish on us at any time (and that's okay!)
# But, since the file is gone, we want the entire fd
# entry gone too:
pass # del inodes[inode]
def checkListeners():
terminalWidth = run(TERMINAL_WIDTH)
# Parse the four proc files: /proc/net/{tcp,udp}{,6}
# populates the 'inodes' dict
iterateProcNetworkingGatherListeningInodes()
if os.path.isfile("/proc/pid_inode_map"):
inodePidMap = generateInodePidMapFromProcCustom()
else:
inodePidMap = generateInodePidMapFromProcGlob()
addProcessNamesToInodes(inodePidMap)
try:
cols = readOutput(terminalWidth)[0]
cols = int(cols)
except BaseException:
cols = 80
# Print our own custom output header...
proto = "Proto"
addr = "Listening"
pid = "PID"
process = "Process"
print(f"{COLOR_HEADER}{proto:^5} {addr:^25} {pid:>5} {process:^30}")
# Could sort by anything: ip, port, proto, pid, command name
# (or even the inode integer if that provided any insight whatsoever)
def compareByPidOrPort(what):
k, v = what
# v = [ip, port, proto, pid, cmd]
# - OR -
# v = [ip, port, proto]
# If we're not running as root we can't pid and command mappings for
# the processes of other users, so sort the pids we did find at end
# of list and show UNKNOWN entries first
# (because the lines will be shorter most likely so the bigger visual
# weight should be lower in the display table)
try:
# Pid available! Sort by first pid, subsort by IP then port.
return (1, v[3], v[0], v[1])
except BaseException:
# No pid available! Sort by port number then IP then... port again.
return (0, v[1], v[0], v[1])
# Sort results by pid...
for name, vals in sorted(inodes.items(), key=compareByPidOrPort):
attachedPids = vals[3:]
if attachedPids:
desc = [f"{pid:5} {' '.join(cmd)}" for pid, cmd in attachedPids]
else:
# If not running as root, we won't have pid or process, so use
# defaults
desc = ["UNKNOWN (must be root for global pid mappings)"]
port = vals[1]
try:
# Convert port integer to service name if possible
port = socket.getservbyport(port)
except BaseException:
# If no match, just use port number directly.
pass
addr = f"{vals[0]}:{port}"
proto = vals[2]
# If IP address looks like it could be visible to the world,
# throw up a color.
# Note: due to port forwarding and NAT and other issues,
# this clearly isn't exhaustive.
if re.match(likelyLocalOnly, addr):
colorNotice = COLOR_OKAY
else:
colorNotice = COLOR_WARNING
isFirstLine = True
for line in desc:
if isFirstLine:
output = f"{colorNotice}{proto:5} {addr:25} {line}"
isFirstLine = False
else:
output = f"{' ':31} {line}"
# Be a polite terminal citizen by limiting our width to user's width
# (colors take up non-visible space, so add it to our col count,
# but only if it's a first line where we wrote the colors,
# otherwise remain limited to actual line length)
print(output[:cols + (len(colorNotice) if isFirstLine else 0)])
if ONLY_LOWEST_PID:
break
print(COLOR_END)
if __name__ == "__main__":
# cheap hack garbage way of setting one option.
# if we need more options, obviously pull in argparse.
if len(sys.argv) > 1:
# When true, only print the first pid listening on
# an IP:Port even if multiple child (or other) processes are attached.
ONLY_LOWEST_PID = True
else:
ONLY_LOWEST_PID = False
checkListeners()