66# Modified Work Copyright (c) 2019 Bradley Beach, esp32spi_mqtt
77# Modified Work Copyright (c) 2012-2019 Roger Light and others, Paho MQTT Python
88
9+ # pylint: disable=too-many-lines
10+
911"""
1012`adafruit_minimqtt`
1113================================================================================
@@ -126,6 +128,7 @@ class MQTT:
126128 :param str client_id: Optional client identifier, defaults to a unique, generated string.
127129 :param bool is_ssl: Sets a secure or insecure connection with the broker.
128130 :param int keep_alive: KeepAlive interval between the broker and the MiniMQTT client.
131+ :param int recv_timeout: receive timeout, in seconds.
129132 :param socket socket_pool: A pool of socket resources available for the given radio.
130133 :param ssl_context: SSL context for long-lived SSL connections.
131134 :param bool use_binary_mode: Messages are passed as bytearray instead of string to callbacks.
@@ -142,6 +145,7 @@ def __init__(
142145 client_id = None ,
143146 is_ssl = True ,
144147 keep_alive = 60 ,
148+ recv_timeout = 10 ,
145149 socket_pool = None ,
146150 ssl_context = None ,
147151 use_binary_mode = False ,
@@ -154,6 +158,7 @@ def __init__(
154158 self ._use_binary_mode = use_binary_mode
155159
156160 self .keep_alive = keep_alive
161+ self ._recv_timeout = recv_timeout
157162 self ._user_data = None
158163 self ._is_connected = False
159164 self ._msg_size_lim = MQTT_MSG_SZ_LIM
@@ -514,6 +519,7 @@ def connect(self, clean_session=True, host=None, port=None, keep_alive=None):
514519 self ._send_str (self ._password )
515520 if self .logger is not None :
516521 self .logger .debug ("Receiving CONNACK packet from broker" )
522+ stamp = time .monotonic ()
517523 while True :
518524 op = self ._wait_for_msg ()
519525 if op == 32 :
@@ -527,6 +533,12 @@ def connect(self, clean_session=True, host=None, port=None, keep_alive=None):
527533 self .on_connect (self , self ._user_data , result , rc [2 ])
528534 return result
529535
536+ if op is None :
537+ if time .monotonic () - stamp > self ._recv_timeout :
538+ raise MMQTTException (
539+ f"No data received from broker for { self ._recv_timeout } seconds."
540+ )
541+
530542 def disconnect (self ):
531543 """Disconnects the MiniMQTT client from the MQTT broker."""
532544 self .is_connected ()
@@ -637,6 +649,7 @@ def publish(self, topic, msg, retain=False, qos=0):
637649 if qos == 0 and self .on_publish is not None :
638650 self .on_publish (self , self ._user_data , topic , self ._pid )
639651 if qos == 1 :
652+ stamp = time .monotonic ()
640653 while True :
641654 op = self ._wait_for_msg ()
642655 if op == 0x40 :
@@ -649,6 +662,12 @@ def publish(self, topic, msg, retain=False, qos=0):
649662 self .on_publish (self , self ._user_data , topic , rcv_pid )
650663 return
651664
665+ if op is None :
666+ if time .monotonic () - stamp > self ._recv_timeout :
667+ raise MMQTTException (
668+ f"No data received from broker for { self ._recv_timeout } seconds."
669+ )
670+
652671 def subscribe (self , topic , qos = 0 ):
653672 """Subscribes to a topic on the MQTT Broker.
654673 This method can subscribe to one topics or multiple topics.
@@ -697,6 +716,7 @@ def subscribe(self, topic, qos=0):
697716 for t , q in topics :
698717 self .logger .debug ("SUBSCRIBING to topic %s with QoS %d" , t , q )
699718 self ._sock .send (packet )
719+ stamp = time .monotonic ()
700720 while True :
701721 op = self ._wait_for_msg ()
702722 if op == 0x90 :
@@ -710,6 +730,12 @@ def subscribe(self, topic, qos=0):
710730 self ._subscribed_topics .append (t )
711731 return
712732
733+ if op is None :
734+ if time .monotonic () - stamp > self ._recv_timeout :
735+ raise MMQTTException (
736+ f"No data received from broker for { self ._recv_timeout } seconds."
737+ )
738+
713739 def unsubscribe (self , topic ):
714740 """Unsubscribes from a MQTT topic.
715741
@@ -747,6 +773,7 @@ def unsubscribe(self, topic):
747773 if self .logger is not None :
748774 self .logger .debug ("Waiting for UNSUBACK..." )
749775 while True :
776+ stamp = time .monotonic ()
750777 op = self ._wait_for_msg ()
751778 if op == 176 :
752779 rc = self ._sock_exact_recv (3 )
@@ -759,6 +786,12 @@ def unsubscribe(self, topic):
759786 self ._subscribed_topics .remove (t )
760787 return
761788
789+ if op is None :
790+ if time .monotonic () - stamp > self ._recv_timeout :
791+ raise MMQTTException (
792+ f"No data received from broker for { self ._recv_timeout } seconds."
793+ )
794+
762795 def reconnect (self , resub_topics = True ):
763796 """Attempts to reconnect to the MQTT broker.
764797
0 commit comments