-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmanaged.py
254 lines (214 loc) · 7.9 KB
/
managed.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
try:
from ipset import Ipset,Entry
except ModuleNotFoundError:
from .ipset import Ipset,Entry
from time import time
import re
from log import logger
class Net:
"""
A class made for network access control and bandwidth accounting based on linux ipsets.
This spawn 2 ipsets for up and down bandwidth accounting.
Instantiating it require root priviledges.
For applicable commands, the command they run are indicated. This is indicative only, and
won't include any optional parameters that are actually set.
"""
def __init__(self, name="langate", mark=(0, 1)):
"""
Create ipsets for access control and bandwidth accounting.
Equivalent to:
`ipset create langate hash:mac`
:param name: name of the ipset. Second one will be <name>-reverse
:param mark: tuple containing min mark and how many vpns to use
"""
self.ipset = Ipset(name)
self.ipset.create("hash:mac", skbinfo=True, comment=True)
self.mark_start, self.mark_mod = mark
self.mark_current = 0
self.logs = list()
logger.debug("Net instance initialized")
def generate_iptables(self, match_internal = "-s 172.16.0.0/255.252.0.0", stop = False):
pass # TODO either fix this function, or drop it if it's not used
def connect_user(self, mac, name=None, timeout=None, mark=None):
"""
Add an entry to the ipsets.
Equivalent to:
`ipset add langate <mac>`
:param mac: mac address of the user.
:param name: name of the entry, stored as comment in the ipset.
:param timeout: timeout of the entry. None for an entry that does not disapear.
:param mark: mark for the entry. None to let the module balance users itself.
"""
if mark is None:
mark = self.mark_current + self.mark_start
self.mark_current = (self.mark_current+1) % self.mark_mod
logger.info("Connecting MAC %s (\"%s\" on mark %s)", mac, name, mark)
self.ipset.add(Entry(mac, skbmark=mark, comment=name))
def disconnect_user(self, mac):
"""
Remove an entry from the ipsets.
Equivalent to:
`ipset del langate <entry mac>`
:param mac: mac of the user.
"""
logger.info("Disconnecting MAC %s", mac)
self.ipset.delete(mac)
def get_user_info(self, mac):
"""
Get users information from his mac address.
Equivalent to:
`sudo ipset list langate`
plus some processing
:param mac: mac address of the user.
:return: User class containing their bandwidth usage and mark
"""
logger.info("Querying info about MAC %s", mac)
entries = self.ipset.list().entries
for entry in entries:
if entry.elem == mac.upper():
mark = entry.skbmark[0] if entry.skbmark else None
name = entry.comment
break
else:
logger.warn("Did not find info about MAC %s", mac)
return None
logger.info("MAC %s belongs to user %s mark %s", mac, name, mark)
return User(mac, mark, name=name)
def clear(self):
"""
Remove all entry from the set.
Equivalent to:
`ipset flush langate`
"""
logger.info("Flushing the entire ipset")
self.ipset.flush()
def get_all_connected(self):
"""
Get all entries from the set.
Equivalent to:
`ipset list langate`
:return: Dictionary mapping mac to a User class
"""
entries = self.ipset.list().entries
users = dict()
for entry in entries:
mac = entry.elem
mark = entry.skbmark[0] if entry.skbmark else None
name = entry.comment
users[mac] = User(mac, mark, name=name)
logger.info("Devices currently connected: %s", len(users))
return users
def delete(self):
"""
Delete the set. After calling this function, the sets can't be used anymore as it no longer exist.
Equivalent to:
`sudo ipset destroy langate`
"""
logger.debug("Destroying the ipset")
self.ipset.destroy()
def get_balance(self):
"""
Get mapping from vpn to user mac.
Equivalent to:
`ipset list langate`
-> Dict[int, Set[mac]]
:return: Dictionary composed of vpn and set of mac addresses
"""
entries = self.ipset.list().entries
balance = dict()
for entry in entries:
skbmark = entry.skbmark[0] if entry.skbmark else None
if skbmark not in balance:
balance[skbmark] = set()
balance[skbmark].add(entry.elem)
logger.debug("Tunnel balance: %s", ",".join(["{}:{}".format(mark, balance[mark]) for mark in balance]))
return balance
def set_vpn(self, mac, vpn):
"""
Move an user to a new vpn.
Does not modify an entry not already in.
Equivalent to:
`ipset list langate` plus
`ipset add langate <mac>`
:param mac: mac address of the user.
:param vpn: Vpn where move the user to.
"""
logger.info("Moving MAC %s over to VPN %s", mac, vpn)
entries = self.ipset.list().entries
if type(vpn) is int:
vpn = (vpn, (1<<32)-1)
for entry in entries:
if entry.elem == mac.upper():
entry.skbmark = vpn
self.ipset.add(entry)
break
else:
logger.warn("MAC %s not found", mac)
pass # not found
def verify_mac(mac: str) -> bool:
"""
Verify if mac address is correctly formed.
:param mac: Mac address to verify.
:return: True is correctly formed, False if not.
"""
return bool(re.match(r'^([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}$', mac))
def verify_ip(ip: str) -> bool:
"""
Verify if ip address is correctly formed.
:param ip: Ip address to verify.
:return: True is correctly formed, False if not.
"""
return bool(re.match(r'^([0-9]{1,3}\.){3}[0-9]{1,3}$', ip))
def get_ip(mac: str) -> str:
"""
Get the ip address associated with a given mac address.
:param mac: Mac address of the user.
:return: Ip address of the user.
"""
logger.info("Querying IP for MAC %s", mac)
if not verify_mac(mac):
raise InvalidAddressError("'{}' is not a valid mac address".format(mac))
f = open('/proc/net/arp', 'r')
lines = f.readlines()[1:]
for line in lines:
if line.startswith(mac, 41): # 41=offset of mac in line
ip = line.split(' ')[0]
logger.info("Found IP %s for MAC %s", ip, mac)
return ip
raise ValueError("'{}' does not have a known ip".format(mac))
# get mac from ip
def get_mac(ip: str) -> str:
"""
Get the mac address associated with a given ip address.
:param ip: Ip address of the user.
:return: Mac address of the user.
"""
logger.info("Querying MAC for IP %s", ip)
if not verify_ip(ip):
raise InvalidAddressError("'{}' is not a valid ip address".format(ip))
f = open('/proc/net/arp', 'r')
lines = f.readlines()[1:]
for line in lines:
if line.startswith(ip + " "):
mac = line[41:].split(' ')[0] # 41=offset of mac in line
logger.info("Found MAC %s for IP %s", mac)
return mac
raise ValueError("'{}' does not have a known mac".format(ip))
class User:
"""
A dataclass to help represent a single user.
Depending on situations, up and down may represent total bandwidth usage,
or usage since previous entry
"""
def __init__(self, mac, mark, name=None):
self.mac = mac
self.mark = mark
self.name = name
def to_dict(self):
return {
"mac": self.mac,
"mark": self.mark,
"name": self.name,
}