3434
3535import gc
3636import time
37- from digitalio import Direction
37+ from digitalio import Direction , DigitalInOut
38+
39+ try :
40+ import busio
41+ from typing import Optional , Dict , Union , List
42+ except ImportError :
43+ pass
3844
3945__version__ = "0.0.0-auto.0"
4046__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_espATcontrol.git"
@@ -67,13 +73,13 @@ class ESP_ATcontrol:
6773
6874 def __init__ (
6975 self ,
70- uart ,
71- default_baudrate ,
76+ uart : busio . UART ,
77+ default_baudrate : int ,
7278 * ,
73- run_baudrate = None ,
74- rts_pin = None ,
75- reset_pin = None ,
76- debug = False
79+ run_baudrate : Optional [ int ] = None ,
80+ rts_pin : Optional [ DigitalInOut ] = None ,
81+ reset_pin : Optional [ DigitalInOut ] = None ,
82+ debug : bool = False
7783 ):
7884 """This function doesn't try to do any sync'ing, just sets up
7985 # the hardware, that way nothing can unexpectedly fail!"""
@@ -100,7 +106,7 @@ def __init__(
100106 self ._ifconfig = []
101107 self ._initialized = False
102108
103- def begin (self ):
109+ def begin (self ) -> None :
104110 """Initialize the module by syncing, resetting if necessary, setting up
105111 the desired baudrate, turning on single-socket mode, and configuring
106112 SSL support. Required before using the module but we dont do in __init__
@@ -128,7 +134,7 @@ def begin(self):
128134 except OKError :
129135 pass # retry
130136
131- def connect (self , secrets ) :
137+ def connect (self , secrets : Dict [ str , Union [ str , int ]]) -> None :
132138 """Repeatedly try to connect to an access point with the details in
133139 the passed in 'secrets' dictionary. Be sure 'ssid' and 'password' are
134140 defined in the secrets dict! If 'timezone' is set, we'll also configure
@@ -160,15 +166,23 @@ def connect(self, secrets):
160166 # *************************** SOCKET SETUP ****************************
161167
162168 @property
163- def cipmux (self ):
169+ def cipmux (self ) -> int :
164170 """The IP socket multiplexing setting. 0 for one socket, 1 for multi-socket"""
165171 replies = self .at_response ("AT+CIPMUX?" , timeout = 3 ).split (b"\r \n " )
166172 for reply in replies :
167173 if reply .startswith (b"+CIPMUX:" ):
168174 return int (reply [8 :])
169175 raise RuntimeError ("Bad response to CIPMUX?" )
170176
171- def socket_connect (self , conntype , remote , remote_port , * , keepalive = 10 , retries = 1 ):
177+ def socket_connect (
178+ self ,
179+ conntype : str ,
180+ remote : str ,
181+ remote_port : int ,
182+ * ,
183+ keepalive : int = 10 ,
184+ retries : int = 1
185+ ) -> bool :
172186 """Open a socket. conntype can be TYPE_TCP, TYPE_UDP, or TYPE_SSL. Remote
173187 can be an IP address or DNS (we'll do the lookup for you. Remote port
174188 is integer port on other side. We can't set the local port"""
@@ -199,7 +213,7 @@ def socket_connect(self, conntype, remote, remote_port, *, keepalive=10, retries
199213 return True
200214 return False
201215
202- def socket_send (self , buffer , timeout = 1 ) :
216+ def socket_send (self , buffer : bytes , timeout : int = 1 ) -> bool :
203217 """Send data over the already-opened socket, buffer must be bytes"""
204218 cmd = "AT+CIPSEND=%d" % len (buffer )
205219 self .at_response (cmd , timeout = 5 , retries = 1 )
@@ -232,7 +246,7 @@ def socket_send(self, buffer, timeout=1):
232246 # Get newlines off front and back, then split into lines
233247 return True
234248
235- def socket_receive (self , timeout = 5 ) :
249+ def socket_receive (self , timeout : int = 5 ) -> bytearray :
236250 # pylint: disable=too-many-nested-blocks, too-many-branches
237251 """Check for incoming data over the open socket, returns bytes"""
238252 incoming_bytes = None
@@ -298,7 +312,7 @@ def socket_receive(self, timeout=5):
298312 gc .collect ()
299313 return ret
300314
301- def socket_disconnect (self ):
315+ def socket_disconnect (self ) -> None :
302316 """Close any open socket, if there is one"""
303317 try :
304318 self .at_response ("AT+CIPCLOSE" , retries = 1 )
@@ -307,7 +321,9 @@ def socket_disconnect(self):
307321
308322 # *************************** SNTP SETUP ****************************
309323
310- def sntp_config (self , enable , timezone = None , server = None ):
324+ def sntp_config (
325+ self , enable : bool , timezone : Optional [int ] = None , server : Optional [str ] = None
326+ ) -> None :
311327 """Configure the built in ESP SNTP client with a UTC-offset number (timezone)
312328 and server as IP or hostname."""
313329 cmd = "AT+CIPSNTPCFG="
@@ -322,7 +338,7 @@ def sntp_config(self, enable, timezone=None, server=None):
322338 self .at_response (cmd , timeout = 3 )
323339
324340 @property
325- def sntp_time (self ):
341+ def sntp_time (self ) -> Union [ bytes , None ] :
326342 """Return a string with time/date information using SNTP, may return
327343 1970 'bad data' on the first few minutes, without warning!"""
328344 replies = self .at_response ("AT+CIPSNTPTIME?" , timeout = 5 ).split (b"\r \n " )
@@ -334,7 +350,7 @@ def sntp_time(self):
334350 # *************************** WIFI SETUP ****************************
335351
336352 @property
337- def is_connected (self ):
353+ def is_connected (self ) -> bool :
338354 """Initialize module if not done yet, and check if we're connected to
339355 an access point, returns True or False"""
340356 if not self ._initialized :
@@ -354,7 +370,7 @@ def is_connected(self):
354370 return False
355371
356372 @property
357- def status (self ):
373+ def status (self ) -> Union [ int , None ] :
358374 """The IP connection status number (see AT+CIPSTATUS datasheet for meaning)"""
359375 replies = self .at_response ("AT+CIPSTATUS" , timeout = 5 ).split (b"\r \n " )
360376 for reply in replies :
@@ -363,7 +379,7 @@ def status(self):
363379 return None
364380
365381 @property
366- def mode (self ):
382+ def mode (self ) -> Union [ int , None ] :
367383 """What mode we're in, can be MODE_STATION, MODE_SOFTAP or MODE_SOFTAPSTATION"""
368384 if not self ._initialized :
369385 self .begin ()
@@ -374,7 +390,7 @@ def mode(self):
374390 raise RuntimeError ("Bad response to CWMODE?" )
375391
376392 @mode .setter
377- def mode (self , mode ) :
393+ def mode (self , mode : int ) -> None :
378394 """Station or AP mode selection, can be MODE_STATION, MODE_SOFTAP or MODE_SOFTAPSTATION"""
379395 if not self ._initialized :
380396 self .begin ()
@@ -383,15 +399,15 @@ def mode(self, mode):
383399 self .at_response ("AT+CWMODE=%d" % mode , timeout = 3 )
384400
385401 @property
386- def local_ip (self ):
402+ def local_ip (self ) -> Union [ str , None ] :
387403 """Our local IP address as a dotted-quad string"""
388404 reply = self .at_response ("AT+CIFSR" ).strip (b"\r \n " )
389405 for line in reply .split (b"\r \n " ):
390406 if line and line .startswith (b'+CIFSR:STAIP,"' ):
391407 return str (line [14 :- 1 ], "utf-8" )
392408 raise RuntimeError ("Couldn't find IP address" )
393409
394- def ping (self , host ) :
410+ def ping (self , host : str ) -> Union [ int , None ] :
395411 """Ping the IP or hostname given, returns ms time or None on failure"""
396412 reply = self .at_response ('AT+PING="%s"' % host .strip ('"' ), timeout = 5 )
397413 for line in reply .split (b"\r \n " ):
@@ -404,7 +420,7 @@ def ping(self, host):
404420 return None
405421 raise RuntimeError ("Couldn't ping" )
406422
407- def nslookup (self , host ) :
423+ def nslookup (self , host : str ) -> Union [ str , None ] :
408424 """Return a dotted-quad IP address strings that matches the hostname"""
409425 reply = self .at_response ('AT+CIPDOMAIN="%s"' % host .strip ('"' ), timeout = 3 )
410426 for line in reply .split (b"\r \n " ):
@@ -415,7 +431,7 @@ def nslookup(self, host):
415431 # *************************** AP SETUP ****************************
416432
417433 @property
418- def remote_AP (self ): # pylint: disable=invalid-name
434+ def remote_AP (self ) -> List [ Union [ int , str , None ]] : # pylint: disable=invalid-name
419435 """The name of the access point we're connected to, as a string"""
420436 stat = self .status
421437 if stat != self .STATUS_APCONNECTED :
@@ -434,7 +450,7 @@ def remote_AP(self): # pylint: disable=invalid-name
434450 return reply
435451 return [None ] * 4
436452
437- def join_AP (self , ssid , password ) : # pylint: disable=invalid-name
453+ def join_AP (self , ssid : str , password : str ) -> None : # pylint: disable=invalid-name
438454 """Try to join an access point by name and password, will return
439455 immediately if we're already connected and won't try to reconnect"""
440456 # First make sure we're in 'station' mode so we can connect to AP's
@@ -456,7 +472,9 @@ def join_AP(self, ssid, password): # pylint: disable=invalid-name
456472 raise RuntimeError ("Didn't get IP address" )
457473 return
458474
459- def scan_APs (self , retries = 3 ): # pylint: disable=invalid-name
475+ def scan_APs ( # pylint: disable=invalid-name
476+ self , retries : int = 3
477+ ) -> Union [List [List [bytes ]], None ]:
460478 """Ask the module to scan for access points and return a list of lists
461479 with name, RSSI, MAC addresses, etc"""
462480 for _ in range (retries ):
@@ -482,11 +500,11 @@ def scan_APs(self, retries=3): # pylint: disable=invalid-name
482500 # ************************** AT LOW LEVEL ****************************
483501
484502 @property
485- def version (self ):
503+ def version (self ) -> Union [ str , None ] :
486504 """The cached version string retrieved via the AT+GMR command"""
487505 return self ._version
488506
489- def get_version (self ):
507+ def get_version (self ) -> Union [ str , None ] :
490508 """Request the AT firmware version string and parse out the
491509 version number"""
492510 reply = self .at_response ("AT+GMR" , timeout = 3 ).strip (b"\r \n " )
@@ -499,12 +517,12 @@ def get_version(self):
499517 self ._version = str (line , "utf-8" )
500518 return self ._version
501519
502- def hw_flow (self , flag ) :
520+ def hw_flow (self , flag : bool ) -> None :
503521 """Turn on HW flow control (if available) on to allow data, or off to stop"""
504522 if self ._rts_pin :
505523 self ._rts_pin .value = not flag
506524
507- def at_response (self , at_cmd , timeout = 5 , retries = 3 ) :
525+ def at_response (self , at_cmd : str , timeout : int = 5 , retries : int = 3 ) -> bytes :
508526 """Send an AT command, check that we got an OK response,
509527 and then cut out the reply lines to return. We can set
510528 a variable timeout (how long we'll wait for response) and
@@ -554,7 +572,7 @@ def at_response(self, at_cmd, timeout=5, retries=3):
554572 return response [:- 4 ]
555573 raise OKError ("No OK response to " + at_cmd )
556574
557- def sync (self ):
575+ def sync (self ) -> bool :
558576 """Check if we have AT commmand sync by sending plain ATs"""
559577 try :
560578 self .at_response ("AT" , timeout = 1 )
@@ -563,12 +581,12 @@ def sync(self):
563581 return False
564582
565583 @property
566- def baudrate (self ):
584+ def baudrate (self ) -> int :
567585 """The baudrate of our UART connection"""
568586 return self ._uart .baudrate
569587
570588 @baudrate .setter
571- def baudrate (self , baudrate ) :
589+ def baudrate (self , baudrate : int ) -> None :
572590 """Change the modules baudrate via AT commands and then check
573591 that we're still sync'd."""
574592 at_cmd = "AT+UART_CUR=" + str (baudrate ) + ",8,1,0,"
@@ -588,14 +606,14 @@ def baudrate(self, baudrate):
588606 if not self .sync ():
589607 raise RuntimeError ("Failed to resync after Baudrate change" )
590608
591- def echo (self , echo ) :
609+ def echo (self , echo : bool ) -> None :
592610 """Set AT command echo on or off"""
593611 if echo :
594612 self .at_response ("ATE1" , timeout = 1 )
595613 else :
596614 self .at_response ("ATE0" , timeout = 1 )
597615
598- def soft_reset (self ):
616+ def soft_reset (self ) -> bool :
599617 """Perform a software reset by AT command. Returns True
600618 if we successfully performed, false if failed to reset"""
601619 try :
@@ -609,13 +627,13 @@ def soft_reset(self):
609627 pass # fail, see below
610628 return False
611629
612- def factory_reset (self ):
630+ def factory_reset (self ) -> None :
613631 """Perform a hard reset, then send factory restore settings request"""
614632 self .hard_reset ()
615633 self .at_response ("AT+RESTORE" , timeout = 1 )
616634 self ._initialized = False
617635
618- def hard_reset (self ):
636+ def hard_reset (self ) -> None :
619637 """Perform a hardware reset by toggling the reset pin, if it was
620638 defined in the initialization of this object"""
621639 if self ._reset_pin :
0 commit comments