forked from Ayyoubzadeh/ESP32-Wiznet-W5500-Micropython
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwiznet5k_dhcp.py
499 lines (440 loc) · 17.4 KB
/
wiznet5k_dhcp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
# SPDX-FileCopyrightText: 2009 Jordan Terell (blog.jordanterrell.com)
# SPDX-FileCopyrightText: 2020 Brent Rubell for Adafruit Industries
# SPDX-FileCopyrightText: 2021 Patrick Van Oosterwijck @ Silicognition LLC
# SPDX-FileCopyrightText: 2021 Vincenzo D'Angelo
#
# SPDX-License-Identifier: MIT
"""
`wiznet5k_dhcp`
================================================================================
Pure-Python implementation of Jordan Terrell's DHCP library v0.3
* Author(s): Jordan Terrell, Brent Rubell, Vincenzo D'Angelo
"""
import gc
import time
import wiznet5k_socket as socket
from random import randint
from micropython import const
from wiznet5k_socket import htonl, htons
# DHCP State Machine
STATE_DHCP_START = const(0x00)
STATE_DHCP_DISCOVER = const(0x01)
STATE_DHCP_REQUEST = const(0x02)
STATE_DHCP_LEASED = const(0x03)
STATE_DHCP_REREQUEST = const(0x04)
STATE_DHCP_RELEASE = const(0x05)
STATE_DHCP_WAIT = const(0x06)
STATE_DHCP_DISCONN = const(0x07)
# DHCP wait time between attempts
DHCP_WAIT_TIME = const(60)
# DHCP Message Types
DHCP_DISCOVER = const(1)
DHCP_OFFER = const(2)
DHCP_REQUEST = const(3)
DHCP_DECLINE = const(4)
DHCP_ACK = const(5)
DHCP_NAK = const(6)
DHCP_RELEASE = const(7)
DHCP_INFORM = const(8)
# DHCP Message OP Codes
DHCP_BOOT_REQUEST = const(0x01)
DHCP_BOOT_REPLY = const(0x02)
DHCP_HTYPE10MB = const(0x01)
DHCP_HTYPE100MB = const(0x02)
DHCP_HLENETHERNET = const(0x06)
DHCP_HOPS = const(0x00)
MAGIC_COOKIE = const(0x63825363)
MAX_DHCP_OPT = const(0x10)
# Default DHCP Server port
DHCP_SERVER_PORT = const(67)
# DHCP Lease Time, in seconds
DEFAULT_LEASE_TIME = const(900)
BROADCAST_SERVER_ADDR = (255, 255, 255, 255)
# DHCP Response Options
MSG_TYPE = 53
SUBNET_MASK = 1
ROUTERS_ON_SUBNET = 3
DNS_SERVERS = 6
DHCP_SERVER_ID = 54
T1_VAL = 58
T2_VAL = 59
LEASE_TIME = 51
OPT_END = 255
# Packet buffer
_BUFF = bytearray(318)
class DHCP:
"""W5k DHCP Client implementation.
:param eth: Wiznet 5k object
:param list mac_address: Hardware MAC.
:param str hostname: The desired hostname, with optional {} to fill in MAC.
:param int response_timeout: DHCP Response timeout.
:param bool debug: Enable debugging output.
"""
# pylint: disable=too-many-arguments, too-many-instance-attributes, invalid-name
def __init__(
self, eth, mac_address, hostname=None, response_timeout=30, debug=False
):
self._debug = debug
self._response_timeout = response_timeout
self._mac_address = mac_address
# Set socket interface
socket.set_interface(eth)
self._eth = eth
self._sock = None
# DHCP state machine
self._dhcp_state = STATE_DHCP_START
self._initial_xid = 0
self._transaction_id = 0
self._start_time = 0
# DHCP server configuration
self.dhcp_server_ip = BROADCAST_SERVER_ADDR
self.local_ip = 0
self.gateway_ip = 0
self.subnet_mask = 0
self.dns_server_ip = 0
# Lease configuration
self._lease_time = 0
self._last_lease_time = 0
self._renew_in_sec = 0
self._rebind_in_sec = 0
self._t1 = 0
self._t2 = 0
# Select an initial transaction id
self._transaction_id = randint(1, 0x7FFFFFFF)
# Host name
mac_string = "".join("{:02X}".format(o) for o in mac_address)
self._hostname = bytes(
(hostname or "WIZnet{}").split(".")[0].format(mac_string)[:42], "utf-8"
)
# pylint: disable=too-many-statements
def send_dhcp_message(self, state, time_elapsed, renew=False):
"""Assemble and send a DHCP message packet to a socket.
:param int state: DHCP Message state.
:param float time_elapsed: Number of seconds elapsed since DHCP process started
:param bool renew: Set True for renew and rebind
"""
_BUFF[:] = b"\x00" * len(_BUFF)
# OP
_BUFF[0] = DHCP_BOOT_REQUEST
# HTYPE
_BUFF[1] = DHCP_HTYPE10MB
# HLEN
_BUFF[2] = DHCP_HLENETHERNET
# HOPS
_BUFF[3] = DHCP_HOPS
# Transaction ID (xid)
self._initial_xid = htonl(self._transaction_id)
self._initial_xid = self._initial_xid.to_bytes(4, "l")
_BUFF[4:7] = self._initial_xid
# seconds elapsed
_BUFF[8] = (int(time_elapsed) & 0xFF00) >> 8
_BUFF[9] = int(time_elapsed) & 0x00FF
# flags
flags = htons(0x8000)
flags = flags.to_bytes(2, "b")
_BUFF[10] = flags[1]
_BUFF[11] = flags[0]
# NOTE: Skipping ciaddr/yiaddr/siaddr/giaddr
# as they're already set to 0.0.0.0
# Except when renewing, then fill in ciaddr
if renew:
_BUFF[12:15] = bytes(self.local_ip)
# chaddr
_BUFF[28:34] = self._mac_address
# NOTE: 192 octets of 0's, BOOTP legacy
# Magic Cookie
_BUFF[236] = (MAGIC_COOKIE >> 24) & 0xFF
_BUFF[237] = (MAGIC_COOKIE >> 16) & 0xFF
_BUFF[238] = (MAGIC_COOKIE >> 8) & 0xFF
_BUFF[239] = MAGIC_COOKIE & 0xFF
# Option - DHCP Message Type
_BUFF[240] = 53
_BUFF[241] = 0x01
_BUFF[242] = state
# Option - Client Identifier
_BUFF[243] = 61
# Length
_BUFF[244] = 0x07
# HW Type - ETH
_BUFF[245] = 0x01
# Client MAC Address
for mac in range(0, len(self._mac_address)):
_BUFF[246 + mac] = self._mac_address[mac]
# Option - Host Name
_BUFF[252] = 12
hostname_len = len(self._hostname)
after_hostname = 254 + hostname_len
_BUFF[253] = hostname_len
_BUFF[254:after_hostname] = self._hostname
if state == DHCP_REQUEST and not renew:
# Set the parsed local IP addr
_BUFF[after_hostname] = 50
_BUFF[after_hostname + 1] = 0x04
_BUFF[after_hostname + 2 : after_hostname + 6] = bytes(self.local_ip)
# Set the parsed dhcp server ip addr
_BUFF[after_hostname + 6] = 54
_BUFF[after_hostname + 7] = 0x04
_BUFF[after_hostname + 8 : after_hostname + 12] = bytes(self.dhcp_server_ip)
_BUFF[after_hostname + 12] = 55
_BUFF[after_hostname + 13] = 0x06
# subnet mask
_BUFF[after_hostname + 14] = 1
# routers on subnet
_BUFF[after_hostname + 15] = 3
# DNS
_BUFF[after_hostname + 16] = 6
# domain name
_BUFF[after_hostname + 17] = 15
# renewal (T1) value
_BUFF[after_hostname + 18] = 58
# rebinding (T2) value
_BUFF[after_hostname + 19] = 59
_BUFF[after_hostname + 20] = 255
# Send DHCP packet
self._sock.send(_BUFF)
# pylint: disable=too-many-branches, too-many-statements
def parse_dhcp_response(self):
"""Parse DHCP response from DHCP server.
Returns DHCP packet type.
"""
# store packet in buffer
_BUFF = self._sock.recv()
if self._debug:
print("DHCP Response: ", _BUFF)
# -- Parse Packet, FIXED -- #
# Validate OP
assert (
_BUFF[0] == DHCP_BOOT_REPLY
), "Malformed Packet - \
DHCP message OP is not expected BOOT Reply."
xid = _BUFF[4:8]
if bytes(xid) < self._initial_xid:
print("f")
return 0, 0
self.local_ip = tuple(_BUFF[16:20])
if _BUFF[28:34] == 0:
return 0, 0
if int.from_bytes(_BUFF[235:240], "l") != MAGIC_COOKIE:
return 0, 0
# -- Parse Packet, VARIABLE -- #
ptr = 240
while _BUFF[ptr] != OPT_END:
if _BUFF[ptr] == MSG_TYPE:
ptr += 1
opt_len = _BUFF[ptr]
ptr += opt_len
msg_type = _BUFF[ptr]
ptr += 1
elif _BUFF[ptr] == SUBNET_MASK:
ptr += 1
opt_len = _BUFF[ptr]
ptr += 1
self.subnet_mask = tuple(_BUFF[ptr : ptr + opt_len])
ptr += opt_len
elif _BUFF[ptr] == DHCP_SERVER_ID:
ptr += 1
opt_len = _BUFF[ptr]
ptr += 1
self.dhcp_server_ip = tuple(_BUFF[ptr : ptr + opt_len])
ptr += opt_len
elif _BUFF[ptr] == LEASE_TIME:
ptr += 1
opt_len = _BUFF[ptr]
ptr += 1
self._lease_time = int.from_bytes(_BUFF[ptr : ptr + opt_len], "l")
ptr += opt_len
elif _BUFF[ptr] == ROUTERS_ON_SUBNET:
ptr += 1
opt_len = _BUFF[ptr]
ptr += 1
self.gateway_ip = tuple(_BUFF[ptr : ptr + opt_len])
ptr += opt_len
elif _BUFF[ptr] == DNS_SERVERS:
ptr += 1
opt_len = _BUFF[ptr]
ptr += 1
self.dns_server_ip = tuple(_BUFF[ptr : ptr + 4])
ptr += opt_len # still increment even though we only read 1 addr.
elif _BUFF[ptr] == T1_VAL:
ptr += 1
opt_len = _BUFF[ptr]
ptr += 1
self._t1 = int.from_bytes(_BUFF[ptr : ptr + opt_len], "l")
ptr += opt_len
elif _BUFF[ptr] == T2_VAL:
ptr += 1
opt_len = _BUFF[ptr]
ptr += 1
self._t2 = int.from_bytes(_BUFF[ptr : ptr + opt_len], "l")
ptr += opt_len
elif _BUFF[ptr] == 0:
break
else:
# We're not interested in this option
ptr += 1
opt_len = _BUFF[ptr]
ptr += 1
# no-op
ptr += opt_len
if self._debug:
print(
"Msg Type: {}\nSubnet Mask: {}\nDHCP Server IP: {}\nDNS Server IP: {}\
\nGateway IP: {}\nLocal IP: {}\nT1: {}\nT2: {}\nLease Time: {}".format(
msg_type,
self.subnet_mask,
self.dhcp_server_ip,
self.dns_server_ip,
self.gateway_ip,
self.local_ip,
self._t1,
self._t2,
self._lease_time,
)
)
gc.collect()
return msg_type, xid
# pylint: disable=too-many-branches, too-many-statements
def _dhcp_state_machine(self):
"""DHCP state machine without wait loops to enable cooperative multi tasking
This state machine is used both by the initial blocking lease request and
the non-blocking DHCP maintenance function"""
if self._eth.link_status:
if self._dhcp_state == STATE_DHCP_DISCONN:
self._dhcp_state = STATE_DHCP_START
else:
if self._dhcp_state != STATE_DHCP_DISCONN:
self._dhcp_state = STATE_DHCP_DISCONN
self.dhcp_server_ip = BROADCAST_SERVER_ADDR
self._last_lease_time = 0
reset_ip = (0, 0, 0, 0)
self._eth.ifconfig = (reset_ip, reset_ip, reset_ip, reset_ip)
if self._sock is not None:
self._sock.close()
self._sock = None
if self._dhcp_state == STATE_DHCP_START:
self._start_time = time.time()
self._transaction_id = (self._transaction_id + 1) & 0x7FFFFFFF
try:
self._sock = socket.socket(type=socket.SOCK_DGRAM)
except RuntimeError:
if self._debug:
print("* DHCP: Failed to allocate socket")
self._dhcp_state = STATE_DHCP_WAIT
else:
self._sock.settimeout(self._response_timeout)
self._sock.bind((None, 68))
self._sock.connect((self.dhcp_server_ip, DHCP_SERVER_PORT))
if self._last_lease_time == 0 or time.time() > (
self._last_lease_time + self._lease_time
):
if self._debug:
print("* DHCP: Send discover to {}".format(self.dhcp_server_ip))
self.send_dhcp_message(
STATE_DHCP_DISCOVER, (time.time() - self._start_time)
)
self._dhcp_state = STATE_DHCP_DISCOVER
else:
if self._debug:
print("* DHCP: Send request to {}".format(self.dhcp_server_ip))
self.send_dhcp_message(
DHCP_REQUEST, (time.time() - self._start_time), True
)
self._dhcp_state = STATE_DHCP_REQUEST
elif self._dhcp_state == STATE_DHCP_DISCOVER:
if self._sock.available():
if self._debug:
print("* DHCP: Parsing OFFER")
msg_type, xid = self.parse_dhcp_response()
if msg_type == DHCP_OFFER:
# Check if transaction ID matches, otherwise it may be an offer
# for another device
if htonl(self._transaction_id) == int.from_bytes(xid, "l"):
if self._debug:
print(
"* DHCP: Send request to {}".format(self.dhcp_server_ip)
)
self._transaction_id = (self._transaction_id + 1) & 0x7FFFFFFF
self.send_dhcp_message(
DHCP_REQUEST, (time.time() - self._start_time)
)
self._dhcp_state = STATE_DHCP_REQUEST
else:
if self._debug:
print("* DHCP: Received OFFER with non-matching xid")
else:
if self._debug:
print("* DHCP: Received DHCP Message is not OFFER")
elif self._dhcp_state == STATE_DHCP_REQUEST:
if self._sock.available():
if self._debug:
print("* DHCP: Parsing ACK")
msg_type, xid = self.parse_dhcp_response()
# Check if transaction ID matches, otherwise it may be
# for another device
if htonl(self._transaction_id) == int.from_bytes(xid, "l"):
if msg_type == DHCP_ACK:
if self._debug:
print("* DHCP: Successful lease")
self._sock.close()
self._sock = None
self._dhcp_state = STATE_DHCP_LEASED
self._last_lease_time = self._start_time
if self._lease_time == 0:
self._lease_time = DEFAULT_LEASE_TIME
if self._t1 == 0:
# T1 is 50% of _lease_time
self._t1 = self._lease_time >> 1
if self._t2 == 0:
# T2 is 87.5% of _lease_time
self._t2 = self._lease_time - (self._lease_time >> 3)
self._renew_in_sec = self._t1
self._rebind_in_sec = self._t2
self._eth.ifconfig = (
self.local_ip,
self.subnet_mask,
self.gateway_ip,
self.dns_server_ip,
)
gc.collect()
else:
if self._debug:
print("* DHCP: Received DHCP Message is not ACK")
else:
if self._debug:
print("* DHCP: Received non-matching xid")
elif self._dhcp_state == STATE_DHCP_WAIT:
if time.time() > (self._start_time + DHCP_WAIT_TIME):
if self._debug:
print("* DHCP: Begin retry")
self._dhcp_state = STATE_DHCP_START
if time.time() > (self._last_lease_time + self._rebind_in_sec):
self.dhcp_server_ip = BROADCAST_SERVER_ADDR
if time.time() > (self._last_lease_time + self._lease_time):
reset_ip = (0, 0, 0, 0)
self._eth.ifconfig = (reset_ip, reset_ip, reset_ip, reset_ip)
elif self._dhcp_state == STATE_DHCP_LEASED:
if time.time() > (self._last_lease_time + self._renew_in_sec):
self._dhcp_state = STATE_DHCP_START
if self._debug:
print("* DHCP: Time to renew lease")
if (
self._dhcp_state == STATE_DHCP_DISCOVER
or self._dhcp_state == STATE_DHCP_REQUEST
) and time.time() > (self._start_time + self._response_timeout):
self._dhcp_state = STATE_DHCP_WAIT
if self._sock is not None:
self._sock.close()
self._sock = None
def request_dhcp_lease(self):
"""Request to renew or acquire a DHCP lease."""
if self._dhcp_state == STATE_DHCP_LEASED or self._dhcp_state == STATE_DHCP_WAIT:
self._dhcp_state = STATE_DHCP_START
while (
self._dhcp_state != STATE_DHCP_LEASED
and self._dhcp_state != STATE_DHCP_WAIT
):
self._dhcp_state_machine()
return self._dhcp_state == STATE_DHCP_LEASED
def maintain_dhcp_lease(self):
"""Maintain DHCP lease"""
self._dhcp_state_machine()