-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
dhcp_relay.py
305 lines (236 loc) · 10.7 KB
/
dhcp_relay.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
import click
import ipaddress
import utilities_common.cli as clicommon
DHCP_RELAY_TABLE = "DHCP_RELAY"
DHCPV6_SERVERS = "dhcpv6_servers"
IPV6 = 6
VLAN_TABLE = "VLAN"
DHCPV4_SERVERS = "dhcp_servers"
IPV4 = 4
def validate_ips(ctx, ips, ip_version):
for ip in ips:
try:
ip_address = ipaddress.ip_address(ip)
except Exception:
ctx.fail("{} is invalid IP address".format(ip))
if ip_address.version != ip_version:
ctx.fail("{} is not IPv{} address".format(ip, ip_version))
def get_dhcp_servers(db, vlan_name, ctx, table_name, dhcp_servers_str, check_is_exist=True):
if check_is_exist:
keys = db.cfgdb.get_keys(table_name)
if vlan_name not in keys:
ctx.fail("{} doesn't exist".format(vlan_name))
table = db.cfgdb.get_entry(table_name, vlan_name)
dhcp_servers = table.get(dhcp_servers_str, [])
return dhcp_servers, table
def restart_dhcp_relay_service():
"""
Restart dhcp_relay service
"""
click.echo("Restarting DHCP relay service...")
clicommon.run_command(['systemctl', 'stop', 'dhcp_relay'], display_cmd=False)
clicommon.run_command(['systemctl', 'reset-failed', 'dhcp_relay'], display_cmd=False)
clicommon.run_command(['systemctl', 'start', 'dhcp_relay'], display_cmd=False)
def add_dhcp_relay(vid, dhcp_relay_ips, db, ip_version):
table_name = DHCP_RELAY_TABLE if ip_version == 6 else VLAN_TABLE
dhcp_servers_str = DHCPV6_SERVERS if ip_version == 6 else DHCPV4_SERVERS
vlan_name = "Vlan{}".format(vid)
ctx = click.get_current_context()
# Verify ip addresses are valid
validate_ips(ctx, dhcp_relay_ips, ip_version)
# It's unnecessary for DHCPv6 Relay to verify entry exist
check_config_exist = True if ip_version == 4 else False
dhcp_servers, table = get_dhcp_servers(db, vlan_name, ctx, table_name, dhcp_servers_str, check_config_exist)
added_ips = []
for dhcp_relay_ip in dhcp_relay_ips:
# Verify ip addresses not duplicate in add list
if dhcp_relay_ip in added_ips:
ctx.fail("Error: Find duplicate DHCP relay ip {} in add list".format(dhcp_relay_ip))
# Verify ip addresses not exist in DB
if dhcp_relay_ip in dhcp_servers:
click.echo("{} is already a DHCP relay for {}".format(dhcp_relay_ip, vlan_name))
return
dhcp_servers.append(dhcp_relay_ip)
added_ips.append(dhcp_relay_ip)
table[dhcp_servers_str] = dhcp_servers
db.cfgdb.set_entry(table_name, vlan_name, table)
click.echo("Added DHCP relay address [{}] to {}".format(",".join(dhcp_relay_ips), vlan_name))
try:
restart_dhcp_relay_service()
except SystemExit as e:
ctx.fail("Restart service dhcp_relay failed with error {}".format(e))
def del_dhcp_relay(vid, dhcp_relay_ips, db, ip_version):
table_name = DHCP_RELAY_TABLE if ip_version == 6 else VLAN_TABLE
dhcp_servers_str = DHCPV6_SERVERS if ip_version == 6 else DHCPV4_SERVERS
vlan_name = "Vlan{}".format(vid)
ctx = click.get_current_context()
# Verify ip addresses are valid
validate_ips(ctx, dhcp_relay_ips, ip_version)
dhcp_servers, table = get_dhcp_servers(db, vlan_name, ctx, table_name, dhcp_servers_str)
removed_ips = []
for dhcp_relay_ip in dhcp_relay_ips:
# Verify ip addresses not duplicate in del list
if dhcp_relay_ip in removed_ips:
ctx.fail("Error: Find duplicate DHCP relay ip {} in del list".format(dhcp_relay_ip))
# Remove dhcp servers if they exist in the DB
if dhcp_relay_ip not in dhcp_servers:
ctx.fail("{} is not a DHCP relay for {}".format(dhcp_relay_ip, vlan_name))
dhcp_servers.remove(dhcp_relay_ip)
removed_ips.append(dhcp_relay_ip)
if len(dhcp_servers) == 0:
del table[dhcp_servers_str]
else:
table[dhcp_servers_str] = dhcp_servers
if ip_version == 6 and len(table.keys()) == 0:
table = None
db.cfgdb.set_entry(table_name, vlan_name, table)
click.echo("Removed DHCP relay address [{}] from {}".format(",".join(dhcp_relay_ips), vlan_name))
try:
restart_dhcp_relay_service()
except SystemExit as e:
ctx.fail("Restart service dhcp_relay failed with error {}".format(e))
def is_dhcp_server_enabled(db):
dhcp_server_feature_entry = db.cfgdb.get_entry("FEATURE", "dhcp_server")
return "state" in dhcp_server_feature_entry and dhcp_server_feature_entry["state"] == "enabled"
@click.group(cls=clicommon.AbbreviationGroup, name="dhcp_relay")
def dhcp_relay():
"""config DHCP_Relay information"""
pass
@dhcp_relay.group(cls=clicommon.AbbreviationGroup, name="ipv6")
def dhcp_relay_ipv6():
pass
@dhcp_relay_ipv6.group(cls=clicommon.AbbreviationGroup, name="destination")
def dhcp_relay_ipv6_destination():
pass
@dhcp_relay_ipv6_destination.command("add")
@click.argument("vid", metavar="<vid>", required=True, type=int)
@click.argument("dhcp_relay_destinations", nargs=-1, required=True)
@clicommon.pass_db
def add_dhcp_relay_ipv6_destination(db, vid, dhcp_relay_destinations):
add_dhcp_relay(vid, dhcp_relay_destinations, db, IPV6)
@dhcp_relay_ipv6_destination.command("del")
@click.argument("vid", metavar="<vid>", required=True, type=int)
@click.argument("dhcp_relay_destinations", nargs=-1, required=True)
@clicommon.pass_db
def del_dhcp_relay_ipv6_destination(db, vid, dhcp_relay_destinations):
del_dhcp_relay(vid, dhcp_relay_destinations, db, IPV6)
@dhcp_relay.group(cls=clicommon.AbbreviationGroup, name="ipv4")
def dhcp_relay_ipv4():
pass
@dhcp_relay_ipv4.group(cls=clicommon.AbbreviationGroup, name="helper")
def dhcp_relay_ipv4_helper():
pass
@dhcp_relay_ipv4_helper.command("add")
@click.argument("vid", metavar="<vid>", required=True, type=int)
@click.argument("dhcp_relay_helpers", nargs=-1, required=True)
@clicommon.pass_db
def add_dhcp_relay_ipv4_helper(db, vid, dhcp_relay_helpers):
if is_dhcp_server_enabled(db):
click.echo("Cannot change ipv4 dhcp_relay configuration when dhcp_server feature is enabled")
return
add_dhcp_relay(vid, dhcp_relay_helpers, db, IPV4)
@dhcp_relay_ipv4_helper.command("del")
@click.argument("vid", metavar="<vid>", required=True, type=int)
@click.argument("dhcp_relay_helpers", nargs=-1, required=True)
@clicommon.pass_db
def del_dhcp_relay_ipv4_helper(db, vid, dhcp_relay_helpers):
if is_dhcp_server_enabled(db):
click.echo("Cannot change ipv4 dhcp_relay configuration when dhcp_server feature is enabled")
return
del_dhcp_relay(vid, dhcp_relay_helpers, db, IPV4)
# subcommand of vlan
@click.group(cls=clicommon.AbbreviationGroup, name='dhcp_relay')
def vlan_dhcp_relay():
pass
@vlan_dhcp_relay.command('add')
@click.argument('vid', metavar='<vid>', required=True, type=int)
@click.argument('dhcp_relay_destination_ips', nargs=-1, required=True)
@clicommon.pass_db
def add_vlan_dhcp_relay_destination(db, vid, dhcp_relay_destination_ips):
""" Add a destination IP address to the VLAN's DHCP relay """
ctx = click.get_current_context()
added_servers = []
# Verify vlan is valid
vlan_name = 'Vlan{}'.format(vid)
vlan = db.cfgdb.get_entry('VLAN', vlan_name)
if len(vlan) == 0:
ctx.fail("{} doesn't exist".format(vlan_name))
# Verify all ip addresses are valid and not exist in DB
dhcp_servers = vlan.get('dhcp_servers', [])
dhcpv6_servers = vlan.get('dhcpv6_servers', [])
for ip_addr in dhcp_relay_destination_ips:
try:
ipaddress.ip_address(ip_addr)
if (ip_addr in dhcp_servers) or (ip_addr in dhcpv6_servers):
click.echo("{} is already a DHCP relay destination for {}".format(ip_addr, vlan_name))
continue
if clicommon.ipaddress_type(ip_addr) == 4:
if is_dhcp_server_enabled(db):
click.echo("Cannot change dhcp_relay configuration when dhcp_server feature is enabled")
return
dhcp_servers.append(ip_addr)
else:
dhcpv6_servers.append(ip_addr)
added_servers.append(ip_addr)
except Exception:
ctx.fail('{} is invalid IP address'.format(ip_addr))
# Append new dhcp servers to config DB
if len(dhcp_servers):
vlan['dhcp_servers'] = dhcp_servers
if len(dhcpv6_servers):
vlan['dhcpv6_servers'] = dhcpv6_servers
db.cfgdb.set_entry('VLAN', vlan_name, vlan)
if len(added_servers):
click.echo("Added DHCP relay destination addresses {} to {}".format(added_servers, vlan_name))
try:
restart_dhcp_relay_service()
except SystemExit as e:
ctx.fail("Restart service dhcp_relay failed with error {}".format(e))
@vlan_dhcp_relay.command('del')
@click.argument('vid', metavar='<vid>', required=True, type=int)
@click.argument('dhcp_relay_destination_ips', nargs=-1, required=True)
@clicommon.pass_db
def del_vlan_dhcp_relay_destination(db, vid, dhcp_relay_destination_ips):
""" Remove a destination IP address from the VLAN's DHCP relay """
ctx = click.get_current_context()
# Verify vlan is valid
vlan_name = 'Vlan{}'.format(vid)
vlan = db.cfgdb.get_entry('VLAN', vlan_name)
if len(vlan) == 0:
ctx.fail("{} doesn't exist".format(vlan_name))
# Remove dhcp servers if they exist in the DB
dhcp_servers = vlan.get('dhcp_servers', [])
dhcpv6_servers = vlan.get('dhcpv6_servers', [])
for ip_addr in dhcp_relay_destination_ips:
if (ip_addr not in dhcp_servers) and (ip_addr not in dhcpv6_servers):
ctx.fail("{} is not a DHCP relay destination for {}".format(ip_addr, vlan_name))
if clicommon.ipaddress_type(ip_addr) == 4:
if is_dhcp_server_enabled(db):
click.echo("Cannot change dhcp_relay configuration when dhcp_server feature is enabled")
return
dhcp_servers.remove(ip_addr)
else:
dhcpv6_servers.remove(ip_addr)
# Update dhcp servers to config DB
if len(dhcp_servers):
vlan['dhcp_servers'] = dhcp_servers
else:
if 'dhcp_servers' in vlan.keys():
del vlan['dhcp_servers']
if len(dhcpv6_servers):
vlan['dhcpv6_servers'] = dhcpv6_servers
else:
if 'dhcpv6_servers' in vlan.keys():
del vlan['dhcpv6_servers']
db.cfgdb.set_entry('VLAN', vlan_name, vlan)
click.echo("Removed DHCP relay destination addresses {} from {}".format(dhcp_relay_destination_ips, vlan_name))
try:
restart_dhcp_relay_service()
except SystemExit as e:
ctx.fail("Restart service dhcp_relay failed with error {}".format(e))
def register(cli):
cli.add_command(dhcp_relay)
cli.commands['vlan'].add_command(vlan_dhcp_relay)
if __name__ == '__main__':
dhcp_relay()
vlan_dhcp_relay()